[RW2] Fix: Only update metadata to WAL when metadata-wal-records feature is enabled (#17470)

* add feature check when UpdateMetadata

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* add appendMetadata boolean to write_hander

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

* fix

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>

---------

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
Minh Nguyen 2025-11-04 10:16:57 +02:00 committed by GitHub
parent 48956f60d7
commit 30992dd032
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 70 additions and 22 deletions

View File

@ -234,6 +234,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
logger.Info("Experimental additional scrape metrics enabled") logger.Info("Experimental additional scrape metrics enabled")
case "metadata-wal-records": case "metadata-wal-records":
c.scrape.AppendMetadata = true c.scrape.AppendMetadata = true
c.web.AppendMetadata = true
logger.Info("Experimental metadata records in WAL enabled") logger.Info("Experimental metadata records in WAL enabled")
case "promql-per-step-stats": case "promql-per-step-stats":
c.enablePerStepStats = true c.enablePerStepStats = true

View File

@ -55,6 +55,7 @@ type writeHandler struct {
ingestCTZeroSample bool ingestCTZeroSample bool
enableTypeAndUnitLabels bool enableTypeAndUnitLabels bool
appendMetadata bool
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -64,7 +65,7 @@ const maxAheadTime = 10 * time.Minute
// //
// NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible
// as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write.
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedMsgs remoteapi.MessageTypes, ingestCTZeroSample, enableTypeAndUnitLabels bool) http.Handler { func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedMsgs remoteapi.MessageTypes, ingestCTZeroSample, enableTypeAndUnitLabels, appendMetadata bool) http.Handler {
h := &writeHandler{ h := &writeHandler{
logger: logger, logger: logger,
appendable: appendable, appendable: appendable,
@ -83,6 +84,7 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable
ingestCTZeroSample: ingestCTZeroSample, ingestCTZeroSample: ingestCTZeroSample,
enableTypeAndUnitLabels: enableTypeAndUnitLabels, enableTypeAndUnitLabels: enableTypeAndUnitLabels,
appendMetadata: appendMetadata,
} }
return remoteapi.NewWriteHandler(h, acceptedMsgs, remoteapi.WithWriteHandlerLogger(logger)) return remoteapi.NewWriteHandler(h, acceptedMsgs, remoteapi.WithWriteHandlerLogger(logger))
} }
@ -448,11 +450,15 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
h.logger.Error("failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) h.logger.Error("failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e))
} }
if _, err = app.UpdateMetadata(ref, ls, m); err != nil { // Only update metadata in WAL if the metadata-wal-records feature is enabled.
h.logger.Debug("error while updating metadata from remote write", "err", err) // Without this feature, metadata is not persisted to WAL.
// Metadata is attached to each series, so since Prometheus does not reject sample without metadata information, if h.appendMetadata {
// we don't report remote write error either. We increment metric instead. if _, err = app.UpdateMetadata(ref, ls, m); err != nil {
samplesWithoutMetadata += rs.AllSamples() - allSamplesSoFar h.logger.Debug("error while updating metadata from remote write", "err", err)
// Metadata is attached to each series, so since Prometheus does not reject sample without metadata information,
// we don't report remote write error either. We increment metric instead.
samplesWithoutMetadata += rs.AllSamples() - allSamplesSoFar
}
} }
} }

View File

@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
} }
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -237,7 +237,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
} }
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -272,7 +272,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
} }
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -301,7 +301,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
// in Prometheus, so keeping like this to not break existing 1.0 clients. // in Prometheus, so keeping like this to not break existing 1.0 clients.
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -365,6 +365,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
ingestCTZeroSample bool ingestCTZeroSample bool
enableTypeAndUnitLabels bool enableTypeAndUnitLabels bool
appendMetadata bool
expectedLabels labels.Labels // For verifying type/unit labels expectedLabels labels.Labels // For verifying type/unit labels
}{ }{
{ {
@ -598,6 +599,37 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
enableTypeAndUnitLabels: false, enableTypeAndUnitLabels: false,
expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"), expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"),
}, },
{
desc: "Metadata-wal-records disabled - metadata should not be stored in WAL",
input: func() []writev2.TimeSeries {
symbolTable := writev2.NewSymbolTable()
labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric_wal", "instance", "localhost"), nil)
helpRef := symbolTable.Symbolize("Test metric for WAL verification")
unitRef := symbolTable.Symbolize("seconds")
return []writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
HelpRef: helpRef,
UnitRef: unitRef,
},
Samples: []writev2.Sample{{Value: 42.0, Timestamp: 2000}},
},
}
}(),
symbols: func() []string {
symbolTable := writev2.NewSymbolTable()
symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric_wal", "instance", "localhost"), nil)
symbolTable.Symbolize("Test metric for WAL verification")
symbolTable.Symbolize("seconds")
return symbolTable.Symbols()
}(),
expectedCode: http.StatusNoContent,
enableTypeAndUnitLabels: false,
appendMetadata: false,
expectedLabels: labels.FromStrings("__name__", "test_metric_wal", "instance", "localhost"),
},
{ {
desc: "Type and unit labels enabled but no metadata", desc: "Type and unit labels enabled but no metadata",
input: func() []writev2.TimeSeries { input: func() []writev2.TimeSeries {
@ -674,7 +706,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
appendExemplarErr: tc.appendExemplarErr, appendExemplarErr: tc.appendExemplarErr,
updateMetadataErr: tc.updateMetadataErr, updateMetadataErr: tc.updateMetadataErr,
} }
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, tc.ingestCTZeroSample, tc.enableTypeAndUnitLabels) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, tc.ingestCTZeroSample, tc.enableTypeAndUnitLabels, tc.appendMetadata)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -766,12 +798,17 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
j++ j++
} }
} }
if tc.updateMetadataErr == nil { if tc.appendMetadata && tc.updateMetadataErr == nil {
expectedMeta := ts.ToMetadata(writeV2RequestFixture.Symbols) expectedMeta := ts.ToMetadata(writeV2RequestFixture.Symbols)
requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m]) requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m])
m++ m++
} }
} }
// Verify that when the feature flag is disabled, no metadata is stored in WAL.
if !tc.appendMetadata {
require.Empty(t, appendable.metadata, "metadata should not be stored when appendMetadata (metadata-wal-records) is false")
}
}) })
} }
} }
@ -802,7 +839,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -844,7 +881,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -882,7 +919,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -932,7 +969,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
for _, tc := range testCases { for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) { b.Run(tc.name, func(b *testing.B) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{tc.protoFormat}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{tc.protoFormat}, false, false, false)
b.ResetTimer() b.ResetTimer()
for b.Loop() { for b.Loop() {
b.StopTimer() b.StopTimer()
@ -957,7 +994,7 @@ func TestCommitErr_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{commitErr: errors.New("commit error")} appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -1023,7 +1060,7 @@ func TestHistogramValidationErrorHandling(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) }) t.Cleanup(func() { require.NoError(t, db.Close()) })
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
var buf []byte var buf []byte
@ -1068,7 +1105,7 @@ func TestCommitErr_V2Message(t *testing.T) {
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{commitErr: errors.New("commit error")} appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -1095,7 +1132,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close()) require.NoError(b, db.Close())
}) })
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
require.NoError(b, err) require.NoError(b, err)
@ -1426,7 +1463,7 @@ func TestHistogramsReduction(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} { for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(string(protoMsg), func(t *testing.T) { t.Run(string(protoMsg), func(t *testing.T) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false, false)
var ( var (
err error err error

View File

@ -293,6 +293,7 @@ func NewAPI(
ctZeroIngestionEnabled bool, ctZeroIngestionEnabled bool,
lookbackDelta time.Duration, lookbackDelta time.Duration,
enableTypeAndUnitLabels bool, enableTypeAndUnitLabels bool,
appendMetadata bool,
overrideErrorCode OverrideErrorCode, overrideErrorCode OverrideErrorCode,
) *API { ) *API {
a := &API{ a := &API{
@ -338,7 +339,7 @@ func NewAPI(
} }
if rwEnabled { if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled, enableTypeAndUnitLabels) a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled, enableTypeAndUnitLabels, appendMetadata)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{

View File

@ -166,6 +166,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable, overri
false, false,
5*time.Minute, 5*time.Minute,
false, false,
false,
overrideErrorCode, overrideErrorCode,
) )

View File

@ -295,6 +295,7 @@ type Options struct {
IsAgent bool IsAgent bool
CTZeroIngestionEnabled bool CTZeroIngestionEnabled bool
EnableTypeAndUnitLabels bool EnableTypeAndUnitLabels bool
AppendMetadata bool
AppName string AppName string
AcceptRemoteWriteProtoMsgs remoteapi.MessageTypes AcceptRemoteWriteProtoMsgs remoteapi.MessageTypes
@ -396,6 +397,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
o.CTZeroIngestionEnabled, o.CTZeroIngestionEnabled,
o.LookbackDelta, o.LookbackDelta,
o.EnableTypeAndUnitLabels, o.EnableTypeAndUnitLabels,
o.AppendMetadata,
nil, nil,
) )