From 30992dd032b9920b944f1928d44f518064fbdca0 Mon Sep 17 00:00:00 2001 From: Minh Nguyen <148210689+pipiland2612@users.noreply.github.com> Date: Tue, 4 Nov 2025 10:16:57 +0200 Subject: [PATCH] [RW2] Fix: Only update metadata to WAL when metadata-wal-records feature is enabled (#17470) * add feature check when UpdateMetadata Signed-off-by: pipiland2612 * add appendMetadata boolean to write_hander Signed-off-by: pipiland2612 * fix Signed-off-by: pipiland2612 --------- Signed-off-by: pipiland2612 --- cmd/prometheus/main.go | 1 + storage/remote/write_handler.go | 18 +++++--- storage/remote/write_handler_test.go | 67 +++++++++++++++++++++------- web/api/v1/api.go | 3 +- web/api/v1/errors_test.go | 1 + web/web.go | 2 + 6 files changed, 70 insertions(+), 22 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b7daf2bc05..d108e4c7a2 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -234,6 +234,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { logger.Info("Experimental additional scrape metrics enabled") case "metadata-wal-records": c.scrape.AppendMetadata = true + c.web.AppendMetadata = true logger.Info("Experimental metadata records in WAL enabled") case "promql-per-step-stats": c.enablePerStepStats = true diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index c19b499e1f..e8559dd00e 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -55,6 +55,7 @@ type writeHandler struct { ingestCTZeroSample bool enableTypeAndUnitLabels bool + appendMetadata bool } const maxAheadTime = 10 * time.Minute @@ -64,7 +65,7 @@ const maxAheadTime = 10 * time.Minute // // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. -func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedMsgs remoteapi.MessageTypes, ingestCTZeroSample, enableTypeAndUnitLabels bool) http.Handler { +func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedMsgs remoteapi.MessageTypes, ingestCTZeroSample, enableTypeAndUnitLabels, appendMetadata bool) http.Handler { h := &writeHandler{ logger: logger, appendable: appendable, @@ -83,6 +84,7 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable ingestCTZeroSample: ingestCTZeroSample, enableTypeAndUnitLabels: enableTypeAndUnitLabels, + appendMetadata: appendMetadata, } return remoteapi.NewWriteHandler(h, acceptedMsgs, remoteapi.WithWriteHandlerLogger(logger)) } @@ -448,11 +450,15 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * h.logger.Error("failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) } - if _, err = app.UpdateMetadata(ref, ls, m); err != nil { - h.logger.Debug("error while updating metadata from remote write", "err", err) - // Metadata is attached to each series, so since Prometheus does not reject sample without metadata information, - // we don't report remote write error either. We increment metric instead. - samplesWithoutMetadata += rs.AllSamples() - allSamplesSoFar + // Only update metadata in WAL if the metadata-wal-records feature is enabled. + // Without this feature, metadata is not persisted to WAL. + if h.appendMetadata { + if _, err = app.UpdateMetadata(ref, ls, m); err != nil { + h.logger.Debug("error while updating metadata from remote write", "err", err) + // Metadata is attached to each series, so since Prometheus does not reject sample without metadata information, + // we don't report remote write error either. We increment metric instead. + samplesWithoutMetadata += rs.AllSamples() - allSamplesSoFar + } } } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 491f82fd7b..536fba63cd 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -237,7 +237,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -272,7 +272,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -301,7 +301,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { // in Prometheus, so keeping like this to not break existing 1.0 clients. appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -365,6 +365,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { ingestCTZeroSample bool enableTypeAndUnitLabels bool + appendMetadata bool expectedLabels labels.Labels // For verifying type/unit labels }{ { @@ -598,6 +599,37 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { enableTypeAndUnitLabels: false, expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"), }, + { + desc: "Metadata-wal-records disabled - metadata should not be stored in WAL", + input: func() []writev2.TimeSeries { + symbolTable := writev2.NewSymbolTable() + labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric_wal", "instance", "localhost"), nil) + helpRef := symbolTable.Symbolize("Test metric for WAL verification") + unitRef := symbolTable.Symbolize("seconds") + return []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + HelpRef: helpRef, + UnitRef: unitRef, + }, + Samples: []writev2.Sample{{Value: 42.0, Timestamp: 2000}}, + }, + } + }(), + symbols: func() []string { + symbolTable := writev2.NewSymbolTable() + symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric_wal", "instance", "localhost"), nil) + symbolTable.Symbolize("Test metric for WAL verification") + symbolTable.Symbolize("seconds") + return symbolTable.Symbols() + }(), + expectedCode: http.StatusNoContent, + enableTypeAndUnitLabels: false, + appendMetadata: false, + expectedLabels: labels.FromStrings("__name__", "test_metric_wal", "instance", "localhost"), + }, { desc: "Type and unit labels enabled but no metadata", input: func() []writev2.TimeSeries { @@ -674,7 +706,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { appendExemplarErr: tc.appendExemplarErr, updateMetadataErr: tc.updateMetadataErr, } - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, tc.ingestCTZeroSample, tc.enableTypeAndUnitLabels) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, tc.ingestCTZeroSample, tc.enableTypeAndUnitLabels, tc.appendMetadata) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -766,12 +798,17 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { j++ } } - if tc.updateMetadataErr == nil { + if tc.appendMetadata && tc.updateMetadataErr == nil { expectedMeta := ts.ToMetadata(writeV2RequestFixture.Symbols) requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m]) m++ } } + + // Verify that when the feature flag is disabled, no metadata is stored in WAL. + if !tc.appendMetadata { + require.Empty(t, appendable.metadata, "metadata should not be stored when appendMetadata (metadata-wal-records) is false") + } }) } } @@ -802,7 +839,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -844,7 +881,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -882,7 +919,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -932,7 +969,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) { for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{tc.protoFormat}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{tc.protoFormat}, false, false, false) b.ResetTimer() for b.Loop() { b.StopTimer() @@ -957,7 +994,7 @@ func TestCommitErr_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -1023,7 +1060,7 @@ func TestHistogramValidationErrorHandling(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, db.Close()) }) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false, false) recorder := httptest.NewRecorder() var buf []byte @@ -1068,7 +1105,7 @@ func TestCommitErr_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -1095,7 +1132,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) @@ -1426,7 +1463,7 @@ func TestHistogramsReduction(t *testing.T) { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { t.Run(string(protoMsg), func(t *testing.T) { appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false, false) var ( err error diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 645d18f751..baddedd495 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -293,6 +293,7 @@ func NewAPI( ctZeroIngestionEnabled bool, lookbackDelta time.Duration, enableTypeAndUnitLabels bool, + appendMetadata bool, overrideErrorCode OverrideErrorCode, ) *API { a := &API{ @@ -338,7 +339,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled, enableTypeAndUnitLabels) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled, enableTypeAndUnitLabels, appendMetadata) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index f785093ee7..c44444404b 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -166,6 +166,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable, overri false, 5*time.Minute, false, + false, overrideErrorCode, ) diff --git a/web/web.go b/web/web.go index f6f87e30fe..2d353a8af8 100644 --- a/web/web.go +++ b/web/web.go @@ -295,6 +295,7 @@ type Options struct { IsAgent bool CTZeroIngestionEnabled bool EnableTypeAndUnitLabels bool + AppendMetadata bool AppName string AcceptRemoteWriteProtoMsgs remoteapi.MessageTypes @@ -396,6 +397,7 @@ func New(logger *slog.Logger, o *Options) *Handler { o.CTZeroIngestionEnabled, o.LookbackDelta, o.EnableTypeAndUnitLabels, + o.AppendMetadata, nil, )