From 16a9a827de85a0440177fca15ecc5fbca49569db Mon Sep 17 00:00:00 2001 From: harsh kumar <135993950+hxrshxz@users.noreply.github.com> Date: Wed, 15 Oct 2025 22:49:41 +0530 Subject: [PATCH] remote-write: Add type and unit labels to 2.0 receiver when feature flag enabled (#17329) * feat(remote): add support for type and unit labels in write handler Signed-off-by: Harsh * minor fixes Signed-off-by: Harsh * fix failing tests Signed-off-by: Harsh * Update storage/remote/write_handler.go Co-authored-by: Bartlomiej Plotka Signed-off-by: harsh kumar <135993950+hxrshxz@users.noreply.github.com> * Update storage/remote/write_handler.go Co-authored-by: Bartlomiej Plotka Signed-off-by: harsh kumar <135993950+hxrshxz@users.noreply.github.com> * refactor: streamline label handling for type and unit in write handler tests Signed-off-by: Harsh * test: enhance V2 message tests for type and unit labels Signed-off-by: Harsh --------- Signed-off-by: Harsh Signed-off-by: harsh kumar <135993950+hxrshxz@users.noreply.github.com> Co-authored-by: Bartlomiej Plotka --- storage/remote/write_handler.go | 22 +++- storage/remote/write_handler_test.go | 179 ++++++++++++++++++++++++--- web/api/v1/api.go | 2 +- 3 files changed, 182 insertions(+), 21 deletions(-) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index a7cd82ffc0..d92ceaecea 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/schema" "github.com/prometheus/prometheus/storage" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "github.com/prometheus/prometheus/util/compression" @@ -55,7 +56,8 @@ type writeHandler struct { acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{} - ingestCTZeroSample bool + ingestCTZeroSample bool + enableTypeAndUnitLabels bool } const maxAheadTime = 10 * time.Minute @@ -65,7 +67,7 @@ const maxAheadTime = 10 * time.Minute // // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. -func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample bool) http.Handler { +func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample, enableTypeAndUnitLabels bool) http.Handler { protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{} for _, acc := range acceptedProtoMsgs { protoMsgs[acc] = struct{}{} @@ -87,7 +89,8 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.", }), - ingestCTZeroSample: ingestCTZeroSample, + ingestCTZeroSample: ingestCTZeroSample, + enableTypeAndUnitLabels: enableTypeAndUnitLabels, } return h } @@ -417,6 +420,18 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms) continue } + + m := ts.ToMetadata(req.Symbols) + if h.enableTypeAndUnitLabels && (m.Type != model.MetricTypeUnknown || m.Unit != "") { + slb := labels.NewScratchBuilder(ls.Len() + 2) // +2 for __type__ and __unit__ + ls.Range(func(l labels.Label) { + slb.Add(l.Name, l.Value) + }) + schema.Metadata{Type: m.Type, Unit: m.Unit}.AddToLabels(&slb) + slb.Sort() + ls = slb.Labels() + } + // Validate series labels early. // NOTE(bwplotka): While spec allows UTF-8, Prometheus Receiver may impose // specific limits and follow https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples case. @@ -535,7 +550,6 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * h.logger.Error("failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) } - m := ts.ToMetadata(req.Symbols) if _, err = app.UpdateMetadata(ref, ls, m); err != nil { h.logger.Debug("error while updating metadata from remote write", "err", err) // Metadata is attached to each series, so since Prometheus does not reject sample without metadata information, diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index c651318d00..5f389cc745 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -237,7 +237,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -272,7 +272,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -301,7 +301,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { // in Prometheus, so keeping like this to not break existing 1.0 clients. appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -352,6 +352,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { for _, tc := range []struct { desc string input []writev2.TimeSeries + symbols []string // Custom symbol table for tests that need it expectedCode int expectedRespBody string @@ -362,7 +363,9 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { appendExemplarErr error updateMetadataErr error - ingestCTZeroSample bool + ingestCTZeroSample bool + enableTypeAndUnitLabels bool + expectedLabels labels.Labels // For verifying type/unit labels }{ { desc: "All timeseries accepted/ct_enabled", @@ -513,9 +516,147 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { expectedCode: http.StatusInternalServerError, expectedRespBody: "storage error\n", }, + // Type and unit labels tests + { + desc: "Type and unit labels enabled with counter and bytes unit", + input: func() []writev2.TimeSeries { + symbolTable := writev2.NewSymbolTable() + labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + unitRef := symbolTable.Symbolize("bytes") + return []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, + UnitRef: unitRef, + }, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + } + }(), + symbols: func() []string { + symbolTable := writev2.NewSymbolTable() + symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + symbolTable.Symbolize("bytes") + return symbolTable.Symbols() + }(), + expectedCode: http.StatusNoContent, + enableTypeAndUnitLabels: true, + expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "counter", "__unit__", "bytes", "foo", "bar"), + }, + { + desc: "Type and unit labels enabled with gauge and seconds unit", + input: func() []writev2.TimeSeries { + symbolTable := writev2.NewSymbolTable() + labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + unitRef := symbolTable.Symbolize("seconds") + return []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, + UnitRef: unitRef, + }, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + } + }(), + symbols: func() []string { + symbolTable := writev2.NewSymbolTable() + symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + symbolTable.Symbolize("seconds") + return symbolTable.Symbols() + }(), + expectedCode: http.StatusNoContent, + enableTypeAndUnitLabels: true, + expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "gauge", "__unit__", "seconds", "foo", "bar"), + }, + { + desc: "Type and unit labels disabled - no metadata labels", + input: func() []writev2.TimeSeries { + symbolTable := writev2.NewSymbolTable() + labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + unitRef := symbolTable.Symbolize("bytes") + return []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, + UnitRef: unitRef, + }, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + } + }(), + symbols: func() []string { + symbolTable := writev2.NewSymbolTable() + symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + symbolTable.Symbolize("bytes") + return symbolTable.Symbols() + }(), + expectedCode: http.StatusNoContent, + enableTypeAndUnitLabels: false, + expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"), + }, + { + desc: "Type and unit labels enabled but no metadata", + input: func() []writev2.TimeSeries { + symbolTable := writev2.NewSymbolTable() + labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + return []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, + UnitRef: 0, + }, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + } + }(), + symbols: func() []string { + symbolTable := writev2.NewSymbolTable() + symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + return symbolTable.Symbols() + }(), + expectedCode: http.StatusNoContent, + enableTypeAndUnitLabels: true, + expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"), + }, + { + desc: "Type and unit labels enabled with only unit (no type)", + input: func() []writev2.TimeSeries { + symbolTable := writev2.NewSymbolTable() + labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + unitRef := symbolTable.Symbolize("milliseconds") + return []writev2.TimeSeries{ + { + LabelsRefs: labelRefs, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED, + UnitRef: unitRef, + }, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + } + }(), + symbols: func() []string { + symbolTable := writev2.NewSymbolTable() + symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil) + symbolTable.Symbolize("milliseconds") + return symbolTable.Symbols() + }(), + expectedCode: http.StatusNoContent, + enableTypeAndUnitLabels: true, + expectedLabels: labels.FromStrings("__name__", "test_metric", "__unit__", "milliseconds", "foo", "bar"), + }, } { t.Run(tc.desc, func(t *testing.T) { - payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), tc.input, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy") + symbols := writeV2RequestFixture.Symbols + if tc.symbols != nil { + symbols = tc.symbols + } + payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), tc.input, symbols, nil, nil, nil, "snappy") require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(payload)) @@ -533,7 +674,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { appendExemplarErr: tc.appendExemplarErr, updateMetadataErr: tc.updateMetadataErr, } - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample, tc.enableTypeAndUnitLabels) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -557,6 +698,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { return } + if !tc.expectedLabels.IsEmpty() { + require.Len(t, appendable.samples, 1) + testutil.RequireEqual(t, tc.expectedLabels, appendable.samples[0].l) + return + } + // Double check mandatory 2.0 stats. // writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each. expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) @@ -655,7 +802,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -697,7 +844,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -735,7 +882,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -785,7 +932,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) { for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{tc.protoFormat}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{tc.protoFormat}, false, false) b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() @@ -810,7 +957,7 @@ func TestCommitErr_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -876,7 +1023,7 @@ func TestHistogramValidationErrorHandling(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, db.Close()) }) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{protoMsg}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{protoMsg}, false, false) recorder := httptest.NewRecorder() var buf []byte @@ -921,7 +1068,7 @@ func TestCommitErr_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -948,7 +1095,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) @@ -1279,7 +1426,7 @@ func TestHistogramsReduction(t *testing.T) { for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} { t.Run(string(protoMsg), func(t *testing.T) { appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false, false) var ( err error diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 7ea81e70c6..6fdf459c4f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -335,7 +335,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled, enableTypeAndUnitLabels) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{