mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-06 06:07:11 +02:00
PROM-39: Add type and unit labels to OTLP endpoint (#16630)
* PROM-39: Add type and unit labels to OTLP endpoint Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Extract label addition into helper function Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Wire feature flag and web handler configuration Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Apply suggestions from code review Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Use lowercase for units too Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Use otlptranslator.UnitNamer to build units Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Address copilot's comment Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Verify label presence before adding them Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Overwrite type/unit labels when already set Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * sed/addTypeAndUnitLabels/enableTypeAndUnitLabels/ Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> * Reduce duplicated code Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> --------- Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com> Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
44b0fbba1e
commit
2c04f2d7b1
@ -292,6 +292,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
||||
logger.Info("Enabling native ingestion of delta OTLP metrics, storing the raw sample values without conversion. WARNING: Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.")
|
||||
case "type-and-unit-labels":
|
||||
c.scrape.EnableTypeAndUnitLabels = true
|
||||
c.web.EnableTypeAndUnitLabels = true
|
||||
logger.Info("Experimental type and unit labels enabled")
|
||||
case "use-uncached-io":
|
||||
c.tsdb.UseUncachedIO = true
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
@ -118,7 +119,7 @@ var seps = []byte{'\xff'}
|
||||
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
|
||||
// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
|
||||
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope scope, settings Settings,
|
||||
ignoreAttrs []string, logOnOverwrite bool, extras ...string,
|
||||
ignoreAttrs []string, logOnOverwrite bool, metadata prompb.MetricMetadata, extras ...string,
|
||||
) []prompb.Label {
|
||||
resourceAttrs := resource.Attributes()
|
||||
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
|
||||
@ -142,6 +143,9 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s
|
||||
if haveInstanceID {
|
||||
maxLabelCount++
|
||||
}
|
||||
if settings.EnableTypeAndUnitLabels {
|
||||
maxLabelCount += 2
|
||||
}
|
||||
|
||||
// Ensure attributes are sorted by key for consistent merging of keys which
|
||||
// collide when sanitized.
|
||||
@ -186,6 +190,16 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s
|
||||
l["otel_scope_schema_url"] = scope.schemaURL
|
||||
}
|
||||
|
||||
if settings.EnableTypeAndUnitLabels {
|
||||
unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8}
|
||||
if metadata.Type != prompb.MetricMetadata_UNKNOWN {
|
||||
l["__type__"] = strings.ToLower(metadata.Type.String())
|
||||
}
|
||||
if metadata.Unit != "" {
|
||||
l["__unit__"] = unitNamer.Build(metadata.Unit)
|
||||
}
|
||||
}
|
||||
|
||||
// Map service.name + service.namespace to job.
|
||||
if haveServiceName {
|
||||
val := serviceName.AsString()
|
||||
@ -255,7 +269,7 @@ func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporali
|
||||
// However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets:
|
||||
// https://github.com/prometheus/prometheus/issues/13485.
|
||||
func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, baseName string, scope scope,
|
||||
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -264,7 +278,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
|
||||
pt := dataPoints.At(x)
|
||||
timestamp := convertTimeStamp(pt.Timestamp())
|
||||
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false)
|
||||
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata)
|
||||
|
||||
// If the sum is unset, it indicates the _sum metric point should be
|
||||
// omitted
|
||||
@ -278,7 +292,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
sum.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
|
||||
sumlabels := createLabels(baseName+sumStr, baseLabels)
|
||||
sumlabels := createLabels(metadata.MetricFamilyName+sumStr, baseLabels)
|
||||
c.addSample(sum, sumlabels)
|
||||
}
|
||||
|
||||
@ -291,7 +305,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
count.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
|
||||
countlabels := createLabels(baseName+countStr, baseLabels)
|
||||
countlabels := createLabels(metadata.MetricFamilyName+countStr, baseLabels)
|
||||
c.addSample(count, countlabels)
|
||||
|
||||
// cumulative count for conversion to cumulative histogram
|
||||
@ -315,7 +329,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
bucket.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
boundStr := strconv.FormatFloat(bound, 'f', -1, 64)
|
||||
labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr)
|
||||
labels := createLabels(metadata.MetricFamilyName+bucketStr, baseLabels, leStr, boundStr)
|
||||
ts := c.addSample(bucket, labels)
|
||||
|
||||
bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound})
|
||||
@ -329,7 +343,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
} else {
|
||||
infBucket.Value = float64(pt.Count())
|
||||
}
|
||||
infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr)
|
||||
infLabels := createLabels(metadata.MetricFamilyName+bucketStr, baseLabels, leStr, pInfStr)
|
||||
ts := c.addSample(infBucket, infLabels)
|
||||
|
||||
bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)})
|
||||
@ -339,7 +353,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
|
||||
startTimestamp := pt.StartTimestamp()
|
||||
if settings.ExportCreatedMetric && startTimestamp != 0 {
|
||||
labels := createLabels(baseName+createdSuffix, baseLabels)
|
||||
labels := createLabels(metadata.MetricFamilyName+createdSuffix, baseLabels)
|
||||
c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp())
|
||||
}
|
||||
}
|
||||
@ -465,7 +479,7 @@ func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp p
|
||||
}
|
||||
|
||||
func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
|
||||
settings Settings, baseName string, scope scope,
|
||||
settings Settings, metadata prompb.MetricMetadata, scope scope,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -474,7 +488,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
|
||||
|
||||
pt := dataPoints.At(x)
|
||||
timestamp := convertTimeStamp(pt.Timestamp())
|
||||
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false)
|
||||
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata)
|
||||
|
||||
// treat sum as a sample in an individual TimeSeries
|
||||
sum := &prompb.Sample{
|
||||
@ -485,7 +499,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
|
||||
sum.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
// sum and count of the summary should append suffix to baseName
|
||||
sumlabels := createLabels(baseName+sumStr, baseLabels)
|
||||
sumlabels := createLabels(metadata.MetricFamilyName+sumStr, baseLabels)
|
||||
c.addSample(sum, sumlabels)
|
||||
|
||||
// treat count as a sample in an individual TimeSeries
|
||||
@ -496,7 +510,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
|
||||
if pt.Flags().NoRecordedValue() {
|
||||
count.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
countlabels := createLabels(baseName+countStr, baseLabels)
|
||||
countlabels := createLabels(metadata.MetricFamilyName+countStr, baseLabels)
|
||||
c.addSample(count, countlabels)
|
||||
|
||||
// process each percentile/quantile
|
||||
@ -510,13 +524,13 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
|
||||
quantile.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64)
|
||||
qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr)
|
||||
qtlabels := createLabels(metadata.MetricFamilyName, baseLabels, quantileStr, percentileStr)
|
||||
c.addSample(quantile, qtlabels)
|
||||
}
|
||||
|
||||
startTimestamp := pt.StartTimestamp()
|
||||
if settings.ExportCreatedMetric && startTimestamp != 0 {
|
||||
createdLabels := createLabels(baseName+createdSuffix, baseLabels)
|
||||
createdLabels := createLabels(metadata.MetricFamilyName+createdSuffix, baseLabels)
|
||||
c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp())
|
||||
}
|
||||
}
|
||||
@ -542,6 +556,20 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr
|
||||
return labels
|
||||
}
|
||||
|
||||
// addTypeAndUnitLabels appends type and unit labels to the given labels slice.
|
||||
func addTypeAndUnitLabels(labels []prompb.Label, metadata prompb.MetricMetadata, settings Settings) []prompb.Label {
|
||||
unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8}
|
||||
|
||||
labels = slices.DeleteFunc(labels, func(l prompb.Label) bool {
|
||||
return l.Name == "__type__" || l.Name == "__unit__"
|
||||
})
|
||||
|
||||
labels = append(labels, prompb.Label{Name: "__type__", Value: strings.ToLower(metadata.Type.String())})
|
||||
labels = append(labels, prompb.Label{Name: "__unit__", Value: unitNamer.Build(metadata.Unit)})
|
||||
|
||||
return labels
|
||||
}
|
||||
|
||||
// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false.
|
||||
// Otherwise it creates a new one and returns that, and true.
|
||||
func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
|
||||
@ -627,7 +655,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earlies
|
||||
// Do not pass identifying attributes as ignoreAttrs below.
|
||||
identifyingAttrs = nil
|
||||
}
|
||||
labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, model.MetricNameLabel, name)
|
||||
labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, name)
|
||||
haveIdentifier := false
|
||||
for _, l := range labels {
|
||||
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {
|
||||
|
@ -531,7 +531,7 @@ func TestCreateAttributes(t *testing.T) {
|
||||
}),
|
||||
PromoteScopeMetadata: tc.promoteScope,
|
||||
}
|
||||
lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, model.MetricNameLabel, "test_metric")
|
||||
lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, "test_metric")
|
||||
|
||||
require.ElementsMatch(t, lbls, tc.expectedLabels)
|
||||
})
|
||||
@ -746,7 +746,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
|
||||
ExportCreatedMetric: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
metric.Name(),
|
||||
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
|
||||
tt.scope,
|
||||
)
|
||||
|
||||
@ -944,7 +944,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
|
||||
ExportCreatedMetric: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
metric.Name(),
|
||||
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
|
||||
tt.scope,
|
||||
)
|
||||
|
||||
@ -988,3 +988,58 @@ func TestGetPromExemplars(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAddTypeAndUnitLabels(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
inputLabels []prompb.Label
|
||||
metadata prompb.MetricMetadata
|
||||
expectedLabels []prompb.Label
|
||||
}{
|
||||
{
|
||||
name: "overwrites existing type and unit labels and preserves other labels",
|
||||
inputLabels: []prompb.Label{
|
||||
{Name: "job", Value: "test-job"},
|
||||
{Name: "__type__", Value: "old_type"},
|
||||
{Name: "instance", Value: "test-instance"},
|
||||
{Name: "__unit__", Value: "old_unit"},
|
||||
{Name: "custom_label", Value: "custom_value"},
|
||||
},
|
||||
metadata: prompb.MetricMetadata{
|
||||
Type: prompb.MetricMetadata_COUNTER,
|
||||
Unit: "seconds",
|
||||
},
|
||||
expectedLabels: []prompb.Label{
|
||||
{Name: "job", Value: "test-job"},
|
||||
{Name: "instance", Value: "test-instance"},
|
||||
{Name: "custom_label", Value: "custom_value"},
|
||||
{Name: "__type__", Value: "counter"},
|
||||
{Name: "__unit__", Value: "seconds"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "adds type and unit labels when missing",
|
||||
inputLabels: []prompb.Label{
|
||||
{Name: "job", Value: "test-job"},
|
||||
{Name: "instance", Value: "test-instance"},
|
||||
},
|
||||
metadata: prompb.MetricMetadata{
|
||||
Type: prompb.MetricMetadata_GAUGE,
|
||||
Unit: "bytes",
|
||||
},
|
||||
expectedLabels: []prompb.Label{
|
||||
{Name: "job", Value: "test-job"},
|
||||
{Name: "instance", Value: "test-instance"},
|
||||
{Name: "__type__", Value: "gauge"},
|
||||
{Name: "__unit__", Value: "bytes"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result := addTypeAndUnitLabels(tc.inputLabels, tc.metadata, Settings{AllowUTF8: false})
|
||||
require.ElementsMatch(t, tc.expectedLabels, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -36,8 +36,7 @@ const defaultZeroThreshold = 1e-128
|
||||
// addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series
|
||||
// as native histogram samples.
|
||||
func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, promName string, temporality pmetric.AggregationTemporality,
|
||||
scope scope,
|
||||
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, temporality pmetric.AggregationTemporality, scope scope,
|
||||
) (annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
@ -60,9 +59,11 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
metadata,
|
||||
model.MetricNameLabel,
|
||||
promName,
|
||||
metadata.MetricFamilyName,
|
||||
)
|
||||
|
||||
ts, _ := c.getOrCreateTimeSeries(lbls)
|
||||
ts.Histograms = append(ts.Histograms, histogram)
|
||||
|
||||
@ -253,8 +254,7 @@ func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjust
|
||||
}
|
||||
|
||||
func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, promName string, temporality pmetric.AggregationTemporality,
|
||||
scope scope,
|
||||
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, temporality pmetric.AggregationTemporality, scope scope,
|
||||
) (annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
|
||||
@ -278,8 +278,9 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
metadata,
|
||||
model.MetricNameLabel,
|
||||
promName,
|
||||
metadata.MetricFamilyName,
|
||||
)
|
||||
|
||||
ts, _ := c.getOrCreateTimeSeries(lbls)
|
||||
|
@ -855,7 +855,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
|
||||
ExportCreatedMetric: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
namer.Build(TranslatorMetricFromOtelMetric(metric)),
|
||||
prompb.MetricMetadata{MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric))},
|
||||
pmetric.AggregationTemporalityCumulative,
|
||||
tt.scope,
|
||||
)
|
||||
@ -1312,7 +1312,7 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) {
|
||||
ConvertHistogramsToNHCB: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
namer.Build(TranslatorMetricFromOtelMetric(metric)),
|
||||
prompb.MetricMetadata{MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric))},
|
||||
pmetric.AggregationTemporalityCumulative,
|
||||
tt.scope,
|
||||
)
|
||||
|
@ -53,7 +53,8 @@ type Settings struct {
|
||||
// LookbackDelta is the PromQL engine lookback delta.
|
||||
LookbackDelta time.Duration
|
||||
// PromoteScopeMetadata controls whether to promote OTel scope metadata to metric labels.
|
||||
PromoteScopeMetadata bool
|
||||
PromoteScopeMetadata bool
|
||||
EnableTypeAndUnitLabels bool
|
||||
}
|
||||
|
||||
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
|
||||
@ -170,13 +171,13 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
continue
|
||||
}
|
||||
|
||||
promName := namer.Build(TranslatorMetricFromOtelMetric(metric))
|
||||
c.metadata = append(c.metadata, prompb.MetricMetadata{
|
||||
metadata := prompb.MetricMetadata{
|
||||
Type: otelMetricTypeToPromMetricType(metric),
|
||||
MetricFamilyName: promName,
|
||||
MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric)),
|
||||
Help: metric.Description(),
|
||||
Unit: metric.Unit(),
|
||||
})
|
||||
}
|
||||
c.metadata = append(c.metadata, metadata)
|
||||
|
||||
// handle individual metrics based on type
|
||||
//exhaustive:enforce
|
||||
@ -187,7 +188,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, promName, scope); err != nil {
|
||||
if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
@ -199,7 +200,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName, scope); err != nil {
|
||||
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, metadata, scope); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
@ -213,7 +214,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
}
|
||||
if settings.ConvertHistogramsToNHCB {
|
||||
ws, err := c.addCustomBucketsHistogramDataPoints(
|
||||
ctx, dataPoints, resource, settings, promName, temporality, scope,
|
||||
ctx, dataPoints, resource, settings, metadata, temporality, scope,
|
||||
)
|
||||
annots.Merge(ws)
|
||||
if err != nil {
|
||||
@ -223,7 +224,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, promName, scope); err != nil {
|
||||
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
@ -241,7 +242,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
dataPoints,
|
||||
resource,
|
||||
settings,
|
||||
promName,
|
||||
metadata,
|
||||
temporality,
|
||||
scope,
|
||||
)
|
||||
@ -258,7 +259,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, promName, scope); err != nil {
|
||||
if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
)
|
||||
|
||||
func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, name string, scope scope,
|
||||
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -44,8 +44,9 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
metadata,
|
||||
model.MetricNameLabel,
|
||||
name,
|
||||
metadata.MetricFamilyName,
|
||||
)
|
||||
sample := &prompb.Sample{
|
||||
// convert ns to ms
|
||||
@ -60,6 +61,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
|
||||
if pt.Flags().NoRecordedValue() {
|
||||
sample.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
|
||||
c.addSample(sample, labels)
|
||||
}
|
||||
|
||||
@ -67,7 +69,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
|
||||
}
|
||||
|
||||
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
|
||||
resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string, scope scope,
|
||||
resource pcommon.Resource, metric pmetric.Metric, settings Settings, metadata prompb.MetricMetadata, scope scope,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -82,8 +84,9 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
metadata,
|
||||
model.MetricNameLabel,
|
||||
name,
|
||||
metadata.MetricFamilyName,
|
||||
)
|
||||
sample := &prompb.Sample{
|
||||
// convert ns to ms
|
||||
@ -98,6 +101,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
|
||||
if pt.Flags().NoRecordedValue() {
|
||||
sample.Value = math.Float64frombits(value.StaleNaN)
|
||||
}
|
||||
|
||||
ts := c.addSample(sample, lbls)
|
||||
if ts != nil {
|
||||
exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt)
|
||||
@ -118,7 +122,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
|
||||
copy(createdLabels, lbls)
|
||||
for i, l := range createdLabels {
|
||||
if l.Name == model.MetricNameLabel {
|
||||
createdLabels[i].Value = name + createdSuffix
|
||||
createdLabels[i].Value = metadata.MetricFamilyName + createdSuffix
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +124,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) {
|
||||
ExportCreatedMetric: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
metric.Name(),
|
||||
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
|
||||
tt.scope,
|
||||
)
|
||||
|
||||
@ -362,7 +362,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
|
||||
ExportCreatedMetric: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
metric.Name(),
|
||||
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
|
||||
tt.scope,
|
||||
)
|
||||
|
||||
|
@ -533,6 +533,8 @@ type OTLPOptions struct {
|
||||
// LookbackDelta is the query lookback delta.
|
||||
// Used to calculate the target_info sample timestamp interval.
|
||||
LookbackDelta time.Duration
|
||||
// Add type and unit labels to the metrics.
|
||||
EnableTypeAndUnitLabels bool
|
||||
}
|
||||
|
||||
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
||||
@ -548,9 +550,10 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
|
||||
logger: logger,
|
||||
appendable: appendable,
|
||||
},
|
||||
config: configFunc,
|
||||
allowDeltaTemporality: opts.NativeDelta,
|
||||
lookbackDelta: opts.LookbackDelta,
|
||||
config: configFunc,
|
||||
allowDeltaTemporality: opts.NativeDelta,
|
||||
lookbackDelta: opts.LookbackDelta,
|
||||
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels,
|
||||
}
|
||||
|
||||
wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex}
|
||||
@ -585,9 +588,10 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
|
||||
|
||||
type rwExporter struct {
|
||||
*writeHandler
|
||||
config func() config.Config
|
||||
allowDeltaTemporality bool
|
||||
lookbackDelta time.Duration
|
||||
config func() config.Config
|
||||
allowDeltaTemporality bool
|
||||
lookbackDelta time.Duration
|
||||
enableTypeAndUnitLabels bool
|
||||
}
|
||||
|
||||
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||
@ -601,9 +605,10 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er
|
||||
PromoteResourceAttributes: otlptranslator.NewPromoteResourceAttributes(otlpCfg),
|
||||
KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes,
|
||||
ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB,
|
||||
PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata,
|
||||
AllowDeltaTemporality: rw.allowDeltaTemporality,
|
||||
LookbackDelta: rw.lookbackDelta,
|
||||
PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata,
|
||||
EnableTypeAndUnitLabels: rw.enableTypeAndUnitLabels,
|
||||
})
|
||||
if err != nil {
|
||||
rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)
|
||||
|
@ -384,12 +384,13 @@ func TestOTLPWriteHandler(t *testing.T) {
|
||||
timestamp := time.Now()
|
||||
exportRequest := generateOTLPWriteRequest(timestamp)
|
||||
for _, testCase := range []struct {
|
||||
name string
|
||||
otlpCfg config.OTLPConfig
|
||||
expectedSamples []mockSample
|
||||
name string
|
||||
otlpCfg config.OTLPConfig
|
||||
typeAndUnitLabels bool
|
||||
expectedSamples []mockSample
|
||||
}{
|
||||
{
|
||||
name: "NoTranslation",
|
||||
name: "NoTranslation/NoTypeAndUnitLabels",
|
||||
otlpCfg: config.OTLPConfig{
|
||||
TranslationStrategy: config.NoTranslation,
|
||||
},
|
||||
@ -415,13 +416,42 @@ func TestOTLPWriteHandler(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "UnderscoreEscapingWithSuffixes",
|
||||
name: "NoTranslation/WithTypeAndUnitLabels",
|
||||
otlpCfg: config.OTLPConfig{
|
||||
TranslationStrategy: config.NoTranslation,
|
||||
},
|
||||
typeAndUnitLabels: true,
|
||||
expectedSamples: []mockSample{
|
||||
{
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test.counter"},
|
||||
labels.Label{Name: "__type__", Value: "counter"},
|
||||
labels.Label{Name: "__unit__", Value: "bytes"},
|
||||
labels.Label{Name: "foo.bar", Value: "baz"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"}),
|
||||
t: timestamp.UnixMilli(),
|
||||
v: 10.0,
|
||||
},
|
||||
{
|
||||
l: labels.New(
|
||||
labels.Label{Name: "__name__", Value: "target_info"},
|
||||
labels.Label{Name: "host.name", Value: "test-host"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"},
|
||||
),
|
||||
t: timestamp.UnixMilli(),
|
||||
v: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "UnderscoreEscapingWithSuffixes/NoTypeAndUnitLabels",
|
||||
otlpCfg: config.OTLPConfig{
|
||||
TranslationStrategy: config.UnderscoreEscapingWithSuffixes,
|
||||
},
|
||||
expectedSamples: []mockSample{
|
||||
{
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test_counter_total"},
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test_counter_bytes_total"},
|
||||
labels.Label{Name: "foo_bar", Value: "baz"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"}),
|
||||
@ -467,13 +497,71 @@ func TestOTLPWriteHandler(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "NoUTF8EscapingWithSuffixes",
|
||||
name: "UnderscoreEscapingWithSuffixes/WithTypeAndUnitLabels",
|
||||
otlpCfg: config.OTLPConfig{
|
||||
TranslationStrategy: config.UnderscoreEscapingWithSuffixes,
|
||||
},
|
||||
typeAndUnitLabels: true,
|
||||
expectedSamples: []mockSample{
|
||||
{
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test_counter_bytes_total"},
|
||||
labels.Label{Name: "__type__", Value: "counter"},
|
||||
labels.Label{Name: "__unit__", Value: "bytes"},
|
||||
labels.Label{Name: "foo_bar", Value: "baz"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"}),
|
||||
t: timestamp.UnixMilli(),
|
||||
v: 10.0,
|
||||
},
|
||||
{
|
||||
l: labels.New(
|
||||
labels.Label{Name: "__name__", Value: "target_info"},
|
||||
labels.Label{Name: "host_name", Value: "test-host"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"},
|
||||
),
|
||||
t: timestamp.UnixMilli(),
|
||||
v: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "NoUTF8EscapingWithSuffixes/NoTypeAndUnitLabels",
|
||||
otlpCfg: config.OTLPConfig{
|
||||
TranslationStrategy: config.NoUTF8EscapingWithSuffixes,
|
||||
},
|
||||
expectedSamples: []mockSample{
|
||||
{
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test.counter_total"},
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test.counter_bytes_total"},
|
||||
labels.Label{Name: "foo.bar", Value: "baz"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"}),
|
||||
t: timestamp.UnixMilli(),
|
||||
v: 10.0,
|
||||
},
|
||||
{
|
||||
l: labels.New(
|
||||
labels.Label{Name: "__name__", Value: "target_info"},
|
||||
labels.Label{Name: "host.name", Value: "test-host"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"},
|
||||
),
|
||||
t: timestamp.UnixMilli(),
|
||||
v: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "NoUTF8EscapingWithSuffixes/WithTypeAndUnitLabels",
|
||||
otlpCfg: config.OTLPConfig{
|
||||
TranslationStrategy: config.NoUTF8EscapingWithSuffixes,
|
||||
},
|
||||
typeAndUnitLabels: true,
|
||||
expectedSamples: []mockSample{
|
||||
{
|
||||
l: labels.New(labels.Label{Name: "__name__", Value: "test.counter_bytes_total"},
|
||||
labels.Label{Name: "__type__", Value: "counter"},
|
||||
labels.Label{Name: "__unit__", Value: "bytes"},
|
||||
labels.Label{Name: "foo.bar", Value: "baz"},
|
||||
labels.Label{Name: "instance", Value: "test-instance"},
|
||||
labels.Label{Name: "job", Value: "test-service"}),
|
||||
@ -494,7 +582,7 @@ func TestOTLPWriteHandler(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
appendable := handleOTLP(t, exportRequest, testCase.otlpCfg)
|
||||
appendable := handleOTLP(t, exportRequest, testCase.otlpCfg, testCase.typeAndUnitLabels)
|
||||
for _, sample := range testCase.expectedSamples {
|
||||
requireContainsSample(t, appendable.samples, sample)
|
||||
}
|
||||
@ -518,7 +606,7 @@ func requireContainsSample(t *testing.T, actual []mockSample, expected mockSampl
|
||||
"actual : %v", expected, actual))
|
||||
}
|
||||
|
||||
func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg config.OTLPConfig) *mockAppendable {
|
||||
func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg config.OTLPConfig, typeAndUnitLabels bool) *mockAppendable {
|
||||
buf, err := exportRequest.MarshalProto()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -531,7 +619,7 @@ func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg c
|
||||
return config.Config{
|
||||
OTLPConfig: otlpCfg,
|
||||
}
|
||||
}, OTLPOptions{})
|
||||
}, OTLPOptions{EnableTypeAndUnitLabels: typeAndUnitLabels})
|
||||
recorder := httptest.NewRecorder()
|
||||
handler.ServeHTTP(recorder, req)
|
||||
|
||||
@ -559,6 +647,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest {
|
||||
counterMetric := scopeMetric.Metrics().AppendEmpty()
|
||||
counterMetric.SetName("test.counter")
|
||||
counterMetric.SetDescription("test-counter-description")
|
||||
counterMetric.SetUnit("By")
|
||||
counterMetric.SetEmptySum()
|
||||
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
|
||||
counterMetric.Sum().SetIsMonotonic(true)
|
||||
@ -579,6 +668,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest {
|
||||
gaugeMetric := scopeMetric.Metrics().AppendEmpty()
|
||||
gaugeMetric.SetName("test.gauge")
|
||||
gaugeMetric.SetDescription("test-gauge-description")
|
||||
gaugeMetric.SetUnit("By")
|
||||
gaugeMetric.SetEmptyGauge()
|
||||
|
||||
gaugeDataPoint := gaugeMetric.Gauge().DataPoints().AppendEmpty()
|
||||
@ -590,6 +680,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest {
|
||||
histogramMetric := scopeMetric.Metrics().AppendEmpty()
|
||||
histogramMetric.SetName("test.histogram")
|
||||
histogramMetric.SetDescription("test-histogram-description")
|
||||
histogramMetric.SetUnit("By")
|
||||
histogramMetric.SetEmptyHistogram()
|
||||
histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
|
||||
|
||||
@ -605,6 +696,7 @@ func generateOTLPWriteRequest(timestamp time.Time) pmetricotlp.ExportRequest {
|
||||
exponentialHistogramMetric := scopeMetric.Metrics().AppendEmpty()
|
||||
exponentialHistogramMetric.SetName("test.exponential.histogram")
|
||||
exponentialHistogramMetric.SetDescription("test-exponential-histogram-description")
|
||||
exponentialHistogramMetric.SetUnit("By")
|
||||
exponentialHistogramMetric.SetEmptyExponentialHistogram()
|
||||
exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
|
||||
|
||||
|
@ -265,6 +265,7 @@ func NewAPI(
|
||||
otlpEnabled, otlpDeltaToCumulative, otlpNativeDeltaIngestion bool,
|
||||
ctZeroIngestionEnabled bool,
|
||||
lookbackDelta time.Duration,
|
||||
enableTypeAndUnitLabels bool,
|
||||
) *API {
|
||||
a := &API{
|
||||
QueryEngine: qe,
|
||||
@ -312,9 +313,10 @@ func NewAPI(
|
||||
}
|
||||
if otlpEnabled {
|
||||
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{
|
||||
ConvertDelta: otlpDeltaToCumulative,
|
||||
NativeDelta: otlpNativeDeltaIngestion,
|
||||
LookbackDelta: lookbackDelta,
|
||||
ConvertDelta: otlpDeltaToCumulative,
|
||||
NativeDelta: otlpNativeDeltaIngestion,
|
||||
LookbackDelta: lookbackDelta,
|
||||
EnableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -146,6 +146,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
|
||||
false,
|
||||
false,
|
||||
5*time.Minute,
|
||||
false,
|
||||
)
|
||||
|
||||
promRouter := route.New().WithPrefix("/api/v1")
|
||||
|
@ -293,6 +293,7 @@ type Options struct {
|
||||
NativeOTLPDeltaIngestion bool
|
||||
IsAgent bool
|
||||
CTZeroIngestionEnabled bool
|
||||
EnableTypeAndUnitLabels bool
|
||||
AppName string
|
||||
|
||||
AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg
|
||||
@ -393,6 +394,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
|
||||
o.NativeOTLPDeltaIngestion,
|
||||
o.CTZeroIngestionEnabled,
|
||||
o.LookbackDelta,
|
||||
o.EnableTypeAndUnitLabels,
|
||||
)
|
||||
|
||||
if o.RoutePrefix != "/" {
|
||||
|
Loading…
Reference in New Issue
Block a user