remote-write: Add type and unit labels to 2.0 receiver when feature flag enabled (#17329)

* feat(remote): add support for type and unit labels in write handler

Signed-off-by: Harsh <harshmastic@gmail.com>

* minor fixes

Signed-off-by: Harsh <harshmastic@gmail.com>

* fix failing tests

Signed-off-by: Harsh <harshmastic@gmail.com>

* Update storage/remote/write_handler.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: harsh kumar <135993950+hxrshxz@users.noreply.github.com>

* Update storage/remote/write_handler.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: harsh kumar <135993950+hxrshxz@users.noreply.github.com>

* refactor: streamline label handling for type and unit in write handler tests

Signed-off-by: Harsh <harshmastic@gmail.com>

* test: enhance V2 message tests for type and unit labels

Signed-off-by: Harsh <harshmastic@gmail.com>

---------

Signed-off-by: Harsh <harshmastic@gmail.com>
Signed-off-by: harsh kumar <135993950+hxrshxz@users.noreply.github.com>
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
harsh kumar 2025-10-15 22:49:41 +05:30 committed by GitHub
parent e67218a39e
commit 16a9a827de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 182 additions and 21 deletions

View File

@ -41,6 +41,7 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/schema"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/prometheus/prometheus/util/compression"
@ -55,7 +56,8 @@ type writeHandler struct {
acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{}
ingestCTZeroSample bool
ingestCTZeroSample bool
enableTypeAndUnitLabels bool
}
const maxAheadTime = 10 * time.Minute
@ -65,7 +67,7 @@ const maxAheadTime = 10 * time.Minute
//
// NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible
// as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write.
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample bool) http.Handler {
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample, enableTypeAndUnitLabels bool) http.Handler {
protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{}
for _, acc := range acceptedProtoMsgs {
protoMsgs[acc] = struct{}{}
@ -87,7 +89,8 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable
Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.",
}),
ingestCTZeroSample: ingestCTZeroSample,
ingestCTZeroSample: ingestCTZeroSample,
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
}
return h
}
@ -417,6 +420,18 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms)
continue
}
m := ts.ToMetadata(req.Symbols)
if h.enableTypeAndUnitLabels && (m.Type != model.MetricTypeUnknown || m.Unit != "") {
slb := labels.NewScratchBuilder(ls.Len() + 2) // +2 for __type__ and __unit__
ls.Range(func(l labels.Label) {
slb.Add(l.Name, l.Value)
})
schema.Metadata{Type: m.Type, Unit: m.Unit}.AddToLabels(&slb)
slb.Sort()
ls = slb.Labels()
}
// Validate series labels early.
// NOTE(bwplotka): While spec allows UTF-8, Prometheus Receiver may impose
// specific limits and follow https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples case.
@ -535,7 +550,6 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
h.logger.Error("failed to ingest exemplar, emitting error log, but no error for PRW caller", "err", err.Error(), "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e))
}
m := ts.ToMetadata(req.Symbols)
if _, err = app.UpdateMetadata(ref, ls, m); err != nil {
h.logger.Debug("error while updating metadata from remote write", "err", err)
// Metadata is attached to each series, so since Prometheus does not reject sample without metadata information,

View File

@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
}
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -237,7 +237,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
}
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -272,7 +272,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
}
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -301,7 +301,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
// in Prometheus, so keeping like this to not break existing 1.0 clients.
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -352,6 +352,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
for _, tc := range []struct {
desc string
input []writev2.TimeSeries
symbols []string // Custom symbol table for tests that need it
expectedCode int
expectedRespBody string
@ -362,7 +363,9 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
appendExemplarErr error
updateMetadataErr error
ingestCTZeroSample bool
ingestCTZeroSample bool
enableTypeAndUnitLabels bool
expectedLabels labels.Labels // For verifying type/unit labels
}{
{
desc: "All timeseries accepted/ct_enabled",
@ -513,9 +516,147 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
expectedCode: http.StatusInternalServerError,
expectedRespBody: "storage error\n",
},
// Type and unit labels tests
{
desc: "Type and unit labels enabled with counter and bytes unit",
input: func() []writev2.TimeSeries {
symbolTable := writev2.NewSymbolTable()
labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
unitRef := symbolTable.Symbolize("bytes")
return []writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER,
UnitRef: unitRef,
},
Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}},
},
}
}(),
symbols: func() []string {
symbolTable := writev2.NewSymbolTable()
symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
symbolTable.Symbolize("bytes")
return symbolTable.Symbols()
}(),
expectedCode: http.StatusNoContent,
enableTypeAndUnitLabels: true,
expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "counter", "__unit__", "bytes", "foo", "bar"),
},
{
desc: "Type and unit labels enabled with gauge and seconds unit",
input: func() []writev2.TimeSeries {
symbolTable := writev2.NewSymbolTable()
labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
unitRef := symbolTable.Symbolize("seconds")
return []writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
UnitRef: unitRef,
},
Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}},
},
}
}(),
symbols: func() []string {
symbolTable := writev2.NewSymbolTable()
symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
symbolTable.Symbolize("seconds")
return symbolTable.Symbols()
}(),
expectedCode: http.StatusNoContent,
enableTypeAndUnitLabels: true,
expectedLabels: labels.FromStrings("__name__", "test_metric", "__type__", "gauge", "__unit__", "seconds", "foo", "bar"),
},
{
desc: "Type and unit labels disabled - no metadata labels",
input: func() []writev2.TimeSeries {
symbolTable := writev2.NewSymbolTable()
labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
unitRef := symbolTable.Symbolize("bytes")
return []writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER,
UnitRef: unitRef,
},
Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}},
},
}
}(),
symbols: func() []string {
symbolTable := writev2.NewSymbolTable()
symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
symbolTable.Symbolize("bytes")
return symbolTable.Symbols()
}(),
expectedCode: http.StatusNoContent,
enableTypeAndUnitLabels: false,
expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"),
},
{
desc: "Type and unit labels enabled but no metadata",
input: func() []writev2.TimeSeries {
symbolTable := writev2.NewSymbolTable()
labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
return []writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
UnitRef: 0,
},
Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}},
},
}
}(),
symbols: func() []string {
symbolTable := writev2.NewSymbolTable()
symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
return symbolTable.Symbols()
}(),
expectedCode: http.StatusNoContent,
enableTypeAndUnitLabels: true,
expectedLabels: labels.FromStrings("__name__", "test_metric", "foo", "bar"),
},
{
desc: "Type and unit labels enabled with only unit (no type)",
input: func() []writev2.TimeSeries {
symbolTable := writev2.NewSymbolTable()
labelRefs := symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
unitRef := symbolTable.Symbolize("milliseconds")
return []writev2.TimeSeries{
{
LabelsRefs: labelRefs,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED,
UnitRef: unitRef,
},
Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}},
},
}
}(),
symbols: func() []string {
symbolTable := writev2.NewSymbolTable()
symbolTable.SymbolizeLabels(labels.FromStrings("__name__", "test_metric", "foo", "bar"), nil)
symbolTable.Symbolize("milliseconds")
return symbolTable.Symbols()
}(),
expectedCode: http.StatusNoContent,
enableTypeAndUnitLabels: true,
expectedLabels: labels.FromStrings("__name__", "test_metric", "__unit__", "milliseconds", "foo", "bar"),
},
} {
t.Run(tc.desc, func(t *testing.T) {
payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), tc.input, writeV2RequestFixture.Symbols, nil, nil, nil, "snappy")
symbols := writeV2RequestFixture.Symbols
if tc.symbols != nil {
symbols = tc.symbols
}
payload, _, _, err := buildV2WriteRequest(promslog.NewNopLogger(), tc.input, symbols, nil, nil, nil, "snappy")
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(payload))
@ -533,7 +674,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
appendExemplarErr: tc.appendExemplarErr,
updateMetadataErr: tc.updateMetadataErr,
}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample, tc.enableTypeAndUnitLabels)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -557,6 +698,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
return
}
if !tc.expectedLabels.IsEmpty() {
require.Len(t, appendable.samples, 1)
testutil.RequireEqual(t, tc.expectedLabels, appendable.samples[0].l)
return
}
// Double check mandatory 2.0 stats.
// writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each.
expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader))
@ -655,7 +802,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -697,7 +844,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -735,7 +882,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -785,7 +932,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{tc.protoFormat}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{tc.protoFormat}, false, false)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
@ -810,7 +957,7 @@ func TestCommitErr_V1Message(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -876,7 +1023,7 @@ func TestHistogramValidationErrorHandling(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{protoMsg}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{protoMsg}, false, false)
recorder := httptest.NewRecorder()
var buf []byte
@ -921,7 +1068,7 @@ func TestCommitErr_V2Message(t *testing.T) {
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false, false)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -948,7 +1095,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close())
})
// TODO: test with other proto format(s)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false, false)
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
require.NoError(b, err)
@ -1279,7 +1426,7 @@ func TestHistogramsReduction(t *testing.T) {
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(string(protoMsg), func(t *testing.T) {
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false, false)
var (
err error

View File

@ -335,7 +335,7 @@ func NewAPI(
}
if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled)
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled, enableTypeAndUnitLabels)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{