mirror of
https://github.com/prometheus/prometheus.git
synced 2026-01-17 06:31:03 +01:00
OTLP: label caching for OTLP-to-Prometheus conversion to reduce allocations and improve latency (#17860)
* otlptranslator: add label caching for OTLP-to-Prometheus conversion Add per-request caching to reduce redundant computation and allocations during OTLP metric conversion: 1. Per-request label sanitization cache: Cache sanitized label names within a request to avoid repeated string allocations for commonly repeated labels like __name__, job, instance. 2. Resource-level label caching: Precompute and cache job, instance, promoted resource attributes, and external labels once per ResourceMetrics boundary instead of for each datapoint. 3. Scope-level label caching: Precompute and cache scope metadata labels (otel_scope_name, otel_scope_version, etc.) once per ScopeMetrics boundary. 4. LabelNamer instance caching: Reuse the LabelNamer struct across datapoints within the same resource context. These optimizations significantly reduce allocations and improve latency for OTLP ingestion workloads with many datapoints per resource/scope. --------- Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
parent
ded3a8c309
commit
4afa76d083
1
go.mod
1
go.mod
@ -71,7 +71,6 @@ require (
|
||||
go.opentelemetry.io/collector/consumer v1.48.0
|
||||
go.opentelemetry.io/collector/pdata v1.48.0
|
||||
go.opentelemetry.io/collector/processor v1.48.0
|
||||
go.opentelemetry.io/collector/semconv v0.128.0
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.64.0
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0
|
||||
go.opentelemetry.io/otel v1.39.0
|
||||
|
||||
2
go.sum
2
go.sum
@ -571,8 +571,6 @@ go.opentelemetry.io/collector/processor/processortest v0.142.0 h1:wQnJeXDejBL6r8
|
||||
go.opentelemetry.io/collector/processor/processortest v0.142.0/go.mod h1:QU5SWj0L+92MSvQxZDjwWCsKssNDm+nD6SHn7IvviUE=
|
||||
go.opentelemetry.io/collector/processor/xprocessor v0.142.0 h1:7a1Crxrd5iBMVnebTxkcqxVkRHAlOBUUmNTUVUTnlCU=
|
||||
go.opentelemetry.io/collector/processor/xprocessor v0.142.0/go.mod h1:LY/GS2DiJILJKS3ynU3eOLLWSP8CmN1FtdpAMsVV8AU=
|
||||
go.opentelemetry.io/collector/semconv v0.128.0 h1:MzYOz7Vgb3Kf5D7b49pqqgeUhEmOCuT10bIXb/Cc+k4=
|
||||
go.opentelemetry.io/collector/semconv v0.128.0/go.mod h1:OPXer4l43X23cnjLXIZnRj/qQOjSuq4TgBLI76P9hns=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.64.0 h1:OXSUzgmIFkcC4An+mv+lqqZSndTffXpjAyoR+1f8k/A=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.64.0/go.mod h1:1A4GVLFIm54HFqVdOpWmukap7rgb0frrE3zWXohLPdM=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y=
|
||||
|
||||
@ -19,6 +19,7 @@ package prometheusremotewrite
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
@ -32,7 +33,7 @@ import (
|
||||
"github.com/prometheus/otlptranslator"
|
||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
@ -63,15 +64,14 @@ const (
|
||||
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
|
||||
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
|
||||
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
|
||||
// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
|
||||
func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope scope, settings Settings,
|
||||
//
|
||||
// This function requires for cached resource and scope labels to be set up first.
|
||||
func (c *PrometheusConverter) createAttributes(attributes pcommon.Map, settings Settings,
|
||||
ignoreAttrs []string, logOnOverwrite bool, meta Metadata, extras ...string,
|
||||
) (labels.Labels, error) {
|
||||
resourceAttrs := resource.Attributes()
|
||||
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
|
||||
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)
|
||||
|
||||
promoteScope := settings.PromoteScopeMetadata && scope.name != ""
|
||||
if c.resourceLabels == nil {
|
||||
return labels.EmptyLabels(), errors.New("createAttributes called without initializing resource context")
|
||||
}
|
||||
|
||||
// Ensure attributes are sorted by key for consistent merging of keys which
|
||||
// collide when sanitized.
|
||||
@ -88,12 +88,6 @@ func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attrib
|
||||
c.scratchBuilder.Sort()
|
||||
sortedLabels := c.scratchBuilder.Labels()
|
||||
|
||||
labelNamer := otlptranslator.LabelNamer{
|
||||
UTF8Allowed: settings.AllowUTF8,
|
||||
UnderscoreLabelSanitization: settings.LabelNameUnderscoreSanitization,
|
||||
PreserveMultipleUnderscores: settings.LabelNamePreserveMultipleUnderscores,
|
||||
}
|
||||
|
||||
if settings.AllowUTF8 {
|
||||
// UTF8 is allowed, so conflicts aren't possible.
|
||||
c.builder.Reset(sortedLabels)
|
||||
@ -106,7 +100,7 @@ func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attrib
|
||||
if sortErr != nil {
|
||||
return
|
||||
}
|
||||
finalKey, err := labelNamer.Build(l.Name)
|
||||
finalKey, err := c.buildLabelName(l.Name)
|
||||
if err != nil {
|
||||
sortErr = err
|
||||
return
|
||||
@ -122,28 +116,36 @@ func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attrib
|
||||
}
|
||||
}
|
||||
|
||||
err := settings.PromoteResourceAttributes.addPromotedAttributes(c.builder, resourceAttrs, labelNamer)
|
||||
if err != nil {
|
||||
return labels.EmptyLabels(), err
|
||||
}
|
||||
if promoteScope {
|
||||
var rangeErr error
|
||||
scope.attributes.Range(func(k string, v pcommon.Value) bool {
|
||||
name, err := labelNamer.Build("otel_scope_" + k)
|
||||
if err != nil {
|
||||
rangeErr = err
|
||||
return false
|
||||
if settings.PromoteResourceAttributes != nil {
|
||||
// Merge cached promoted resource labels.
|
||||
c.resourceLabels.promotedLabels.Range(func(l labels.Label) {
|
||||
if c.builder.Get(l.Name) == "" {
|
||||
c.builder.Set(l.Name, l.Value)
|
||||
}
|
||||
c.builder.Set(name, v.AsString())
|
||||
return true
|
||||
})
|
||||
if rangeErr != nil {
|
||||
return labels.EmptyLabels(), rangeErr
|
||||
}
|
||||
// Merge cached job/instance labels.
|
||||
if c.resourceLabels.jobLabel != "" {
|
||||
c.builder.Set(model.JobLabel, c.resourceLabels.jobLabel)
|
||||
}
|
||||
if c.resourceLabels.instanceLabel != "" {
|
||||
c.builder.Set(model.InstanceLabel, c.resourceLabels.instanceLabel)
|
||||
}
|
||||
// Merge cached external labels.
|
||||
for key, value := range c.resourceLabels.externalLabels {
|
||||
if c.builder.Get(key) == "" {
|
||||
c.builder.Set(key, value)
|
||||
}
|
||||
// Scope Name, Version and Schema URL are added after attributes to ensure they are not overwritten by attributes.
|
||||
c.builder.Set("otel_scope_name", scope.name)
|
||||
c.builder.Set("otel_scope_version", scope.version)
|
||||
c.builder.Set("otel_scope_schema_url", scope.schemaURL)
|
||||
}
|
||||
|
||||
if c.scopeLabels != nil {
|
||||
// Merge cached scope labels if scope promotion is enabled.
|
||||
c.scopeLabels.scopeAttrs.Range(func(l labels.Label) {
|
||||
c.builder.Set(l.Name, l.Value)
|
||||
})
|
||||
c.builder.Set("otel_scope_name", c.scopeLabels.scopeName)
|
||||
c.builder.Set("otel_scope_version", c.scopeLabels.scopeVersion)
|
||||
c.builder.Set("otel_scope_schema_url", c.scopeLabels.scopeSchemaURL)
|
||||
}
|
||||
|
||||
if settings.EnableTypeAndUnitLabels {
|
||||
@ -156,27 +158,6 @@ func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attrib
|
||||
}
|
||||
}
|
||||
|
||||
// Map service.name + service.namespace to job.
|
||||
if haveServiceName {
|
||||
val := serviceName.AsString()
|
||||
if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok {
|
||||
val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val)
|
||||
}
|
||||
c.builder.Set(model.JobLabel, val)
|
||||
}
|
||||
// Map service.instance.id to instance.
|
||||
if haveInstanceID {
|
||||
c.builder.Set(model.InstanceLabel, instance.AsString())
|
||||
}
|
||||
for key, value := range settings.ExternalLabels {
|
||||
// External labels have already been sanitized.
|
||||
if existingValue := c.builder.Get(key); existingValue != "" {
|
||||
// Skip external labels if they are overridden by metric attributes.
|
||||
continue
|
||||
}
|
||||
c.builder.Set(key, value)
|
||||
}
|
||||
|
||||
for i := 0; i < len(extras); i += 2 {
|
||||
if i+1 >= len(extras) {
|
||||
break
|
||||
@ -189,7 +170,7 @@ func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attrib
|
||||
// internal labels should be maintained.
|
||||
if len(name) <= 4 || name[:2] != "__" || name[len(name)-2:] != "__" {
|
||||
var err error
|
||||
name, err = labelNamer.Build(name)
|
||||
name, err = c.buildLabelName(name)
|
||||
if err != nil {
|
||||
return labels.EmptyLabels(), err
|
||||
}
|
||||
@ -223,7 +204,7 @@ func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporali
|
||||
// However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets:
|
||||
// https://github.com/prometheus/prometheus/issues/13485.
|
||||
func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, scope scope, meta Metadata,
|
||||
settings Settings, meta Metadata,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -233,7 +214,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
|
||||
pt := dataPoints.At(x)
|
||||
timestamp := convertTimeStamp(pt.Timestamp())
|
||||
startTimestamp := convertTimeStamp(pt.StartTimestamp())
|
||||
baseLabels, err := c.createAttributes(resource, pt.Attributes(), scope, settings, nil, false, meta)
|
||||
baseLabels, err := c.createAttributes(pt.Attributes(), settings, nil, false, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -424,8 +405,8 @@ func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp p
|
||||
return minTimestamp, maxTimestamp
|
||||
}
|
||||
|
||||
func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
|
||||
settings Settings, scope scope, meta Metadata,
|
||||
func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice,
|
||||
settings Settings, meta Metadata,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -435,7 +416,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
|
||||
pt := dataPoints.At(x)
|
||||
timestamp := convertTimeStamp(pt.Timestamp())
|
||||
startTimestamp := convertTimeStamp(pt.StartTimestamp())
|
||||
baseLabels, err := c.createAttributes(resource, pt.Attributes(), scope, settings, nil, false, meta)
|
||||
baseLabels, err := c.createAttributes(pt.Attributes(), settings, nil, false, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -504,9 +485,9 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s
|
||||
|
||||
attributes := resource.Attributes()
|
||||
identifyingAttrs := []string{
|
||||
conventions.AttributeServiceNamespace,
|
||||
conventions.AttributeServiceName,
|
||||
conventions.AttributeServiceInstanceID,
|
||||
string(semconv.ServiceNamespaceKey),
|
||||
string(semconv.ServiceNameKey),
|
||||
string(semconv.ServiceInstanceIDKey),
|
||||
}
|
||||
nonIdentifyingAttrsCount := attributes.Len()
|
||||
for _, a := range identifyingAttrs {
|
||||
@ -538,7 +519,12 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s
|
||||
MetricFamilyName: name,
|
||||
}
|
||||
// TODO: should target info have the __type__ metadata label?
|
||||
lbls, err := c.createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, Metadata{}, model.MetricNameLabel, name)
|
||||
// target_info is a resource-level metric and should not include scope labels.
|
||||
// Temporarily clear scope labels for this call.
|
||||
savedScopeLabels := c.scopeLabels
|
||||
c.scopeLabels = nil
|
||||
lbls, err := c.createAttributes(attributes, settings, identifyingAttrs, false, Metadata{}, model.MetricNameLabel, name)
|
||||
c.scopeLabels = savedScopeLabels
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -413,7 +413,11 @@ func TestCreateAttributes(t *testing.T) {
|
||||
if tc.attrs != (pcommon.Map{}) {
|
||||
testAttrs = tc.attrs
|
||||
}
|
||||
lbls, err := c.createAttributes(testResource, testAttrs, tc.scope, settings, tc.ignoreAttrs, false, Metadata{}, model.MetricNameLabel, "test_metric")
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, c.setResourceContext(testResource, settings))
|
||||
require.NoError(t, c.setScopeContext(tc.scope, settings))
|
||||
|
||||
lbls, err := c.createAttributes(testAttrs, settings, tc.ignoreAttrs, false, Metadata{}, model.MetricNameLabel, "test_metric")
|
||||
require.NoError(t, err)
|
||||
|
||||
testutil.RequireEqual(t, tc.expectedLabels, lbls)
|
||||
@ -643,15 +647,19 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
|
||||
metric := tt.metric()
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
settings := Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
}
|
||||
resource := pcommon.NewResource()
|
||||
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, converter.setResourceContext(resource, settings))
|
||||
require.NoError(t, converter.setScopeContext(tt.scope, settings))
|
||||
|
||||
converter.addSummaryDataPoints(
|
||||
context.Background(),
|
||||
metric.Summary().DataPoints(),
|
||||
pcommon.NewResource(),
|
||||
Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
tt.scope,
|
||||
settings,
|
||||
Metadata{
|
||||
MetricFamilyName: metric.Name(),
|
||||
},
|
||||
@ -806,15 +814,19 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
|
||||
metric := tt.metric()
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
settings := Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
}
|
||||
resource := pcommon.NewResource()
|
||||
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, converter.setResourceContext(resource, settings))
|
||||
require.NoError(t, converter.setScopeContext(tt.scope, settings))
|
||||
|
||||
converter.addHistogramDataPoints(
|
||||
context.Background(),
|
||||
metric.Histogram().DataPoints(),
|
||||
pcommon.NewResource(),
|
||||
Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
tt.scope,
|
||||
settings,
|
||||
Metadata{
|
||||
MetricFamilyName: metric.Name(),
|
||||
},
|
||||
|
||||
@ -22,7 +22,6 @@ import (
|
||||
"math"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
@ -35,8 +34,7 @@ const defaultZeroThreshold = 1e-128
|
||||
// addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series
|
||||
// as native histogram samples.
|
||||
func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, temporality pmetric.AggregationTemporality,
|
||||
scope scope, meta Metadata,
|
||||
settings Settings, temporality pmetric.AggregationTemporality, meta Metadata,
|
||||
) (annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
@ -53,9 +51,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
|
||||
}
|
||||
|
||||
lbls, err := c.createAttributes(
|
||||
resource,
|
||||
pt.Attributes(),
|
||||
scope,
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
@ -253,8 +249,7 @@ func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjust
|
||||
}
|
||||
|
||||
func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, temporality pmetric.AggregationTemporality,
|
||||
scope scope, meta Metadata,
|
||||
settings Settings, temporality pmetric.AggregationTemporality, meta Metadata,
|
||||
) (annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
|
||||
@ -272,9 +267,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
|
||||
}
|
||||
|
||||
lbls, err := c.createAttributes(
|
||||
resource,
|
||||
pt.Attributes(),
|
||||
scope,
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
|
||||
@ -861,15 +861,20 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
|
||||
}
|
||||
name, err := namer.Build(TranslatorMetricFromOtelMetric(metric))
|
||||
require.NoError(t, err)
|
||||
settings := Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
}
|
||||
resource := pcommon.NewResource()
|
||||
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, converter.setResourceContext(resource, settings))
|
||||
require.NoError(t, converter.setScopeContext(tt.scope, settings))
|
||||
|
||||
annots, err := converter.addExponentialHistogramDataPoints(
|
||||
context.Background(),
|
||||
metric.ExponentialHistogram().DataPoints(),
|
||||
pcommon.NewResource(),
|
||||
Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
settings,
|
||||
pmetric.AggregationTemporalityCumulative,
|
||||
tt.scope,
|
||||
Metadata{
|
||||
MetricFamilyName: name,
|
||||
},
|
||||
@ -1334,16 +1339,21 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) {
|
||||
}
|
||||
name, err := namer.Build(TranslatorMetricFromOtelMetric(metric))
|
||||
require.NoError(t, err)
|
||||
settings := Settings{
|
||||
ConvertHistogramsToNHCB: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
}
|
||||
resource := pcommon.NewResource()
|
||||
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, converter.setResourceContext(resource, settings))
|
||||
require.NoError(t, converter.setScopeContext(tt.scope, settings))
|
||||
|
||||
annots, err := converter.addCustomBucketsHistogramDataPoints(
|
||||
context.Background(),
|
||||
metric.Histogram().DataPoints(),
|
||||
pcommon.NewResource(),
|
||||
Settings{
|
||||
ConvertHistogramsToNHCB: true,
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
settings,
|
||||
pmetric.AggregationTemporalityCumulative,
|
||||
tt.scope,
|
||||
Metadata{
|
||||
MetricFamilyName: name,
|
||||
},
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
"github.com/prometheus/otlptranslator"
|
||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||
"go.uber.org/multierr"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
@ -62,6 +63,24 @@ type Settings struct {
|
||||
LabelNamePreserveMultipleUnderscores bool
|
||||
}
|
||||
|
||||
// cachedResourceLabels holds precomputed labels constant for all datapoints in a ResourceMetrics.
|
||||
// These are computed once per ResourceMetrics boundary and reused for all datapoints.
|
||||
type cachedResourceLabels struct {
|
||||
jobLabel string // from service.name + service.namespace.
|
||||
instanceLabel string // from service.instance.id.
|
||||
promotedLabels labels.Labels // promoted resource attributes.
|
||||
externalLabels map[string]string
|
||||
}
|
||||
|
||||
// cachedScopeLabels holds precomputed scope metadata labels.
|
||||
// These are computed once per ScopeMetrics boundary and reused for all datapoints.
|
||||
type cachedScopeLabels struct {
|
||||
scopeName string
|
||||
scopeVersion string
|
||||
scopeSchemaURL string
|
||||
scopeAttrs labels.Labels // otel_scope_* labels.
|
||||
}
|
||||
|
||||
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
|
||||
type PrometheusConverter struct {
|
||||
everyN everyNTimes
|
||||
@ -70,6 +89,15 @@ type PrometheusConverter struct {
|
||||
appender CombinedAppender
|
||||
// seenTargetInfo tracks target_info samples within a batch to prevent duplicates.
|
||||
seenTargetInfo map[targetInfoKey]struct{}
|
||||
|
||||
// Label caching for optimization - computed once per resource/scope boundary.
|
||||
resourceLabels *cachedResourceLabels
|
||||
scopeLabels *cachedScopeLabels
|
||||
labelNamer otlptranslator.LabelNamer
|
||||
|
||||
// sanitizedLabels caches the results of label name sanitization within a request.
|
||||
// This avoids repeated string allocations for the same label names.
|
||||
sanitizedLabels map[string]string
|
||||
}
|
||||
|
||||
// targetInfoKey uniquely identifies a target_info sample by its labelset and timestamp.
|
||||
@ -80,12 +108,27 @@ type targetInfoKey struct {
|
||||
|
||||
func NewPrometheusConverter(appender CombinedAppender) *PrometheusConverter {
|
||||
return &PrometheusConverter{
|
||||
scratchBuilder: labels.NewScratchBuilder(0),
|
||||
builder: labels.NewBuilder(labels.EmptyLabels()),
|
||||
appender: appender,
|
||||
scratchBuilder: labels.NewScratchBuilder(0),
|
||||
builder: labels.NewBuilder(labels.EmptyLabels()),
|
||||
appender: appender,
|
||||
sanitizedLabels: make(map[string]string, 64), // Pre-size for typical label count.
|
||||
}
|
||||
}
|
||||
|
||||
// buildLabelName returns a sanitized label name, using the cache to avoid repeated allocations.
|
||||
func (c *PrometheusConverter) buildLabelName(label string) (string, error) {
|
||||
if sanitized, ok := c.sanitizedLabels[label]; ok {
|
||||
return sanitized, nil
|
||||
}
|
||||
|
||||
sanitized, err := c.labelNamer.Build(label)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
c.sanitizedLabels[label] = sanitized
|
||||
return sanitized, nil
|
||||
}
|
||||
|
||||
func TranslatorMetricFromOtelMetric(metric pmetric.Metric) otlptranslator.Metric {
|
||||
m := otlptranslator.Metric{
|
||||
Name: metric.Name(),
|
||||
@ -140,17 +183,27 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
c.seenTargetInfo = make(map[targetInfoKey]struct{})
|
||||
resourceMetricsSlice := md.ResourceMetrics()
|
||||
|
||||
for i := 0; i < resourceMetricsSlice.Len(); i++ {
|
||||
for i := range resourceMetricsSlice.Len() {
|
||||
resourceMetrics := resourceMetricsSlice.At(i)
|
||||
resource := resourceMetrics.Resource()
|
||||
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
|
||||
if err := c.setResourceContext(resource, settings); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// keep track of the earliest and latest timestamp in the ResourceMetrics for
|
||||
// use with the "target" info metric
|
||||
earliestTimestamp := pcommon.Timestamp(math.MaxUint64)
|
||||
latestTimestamp := pcommon.Timestamp(0)
|
||||
for j := 0; j < scopeMetricsSlice.Len(); j++ {
|
||||
for j := range scopeMetricsSlice.Len() {
|
||||
scopeMetrics := scopeMetricsSlice.At(j)
|
||||
scope := newScopeFromScopeMetrics(scopeMetrics)
|
||||
if err := c.setScopeContext(scope, settings); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
metricSlice := scopeMetrics.Metrics()
|
||||
|
||||
// TODO: decide if instrumentation library information should be exported as labels
|
||||
@ -202,7 +255,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, scope, meta); err != nil {
|
||||
if err := c.addGaugeNumberDataPoints(ctx, dataPoints, settings, meta); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return annots, errs
|
||||
@ -214,7 +267,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, settings, scope, meta); err != nil {
|
||||
if err := c.addSumNumberDataPoints(ctx, dataPoints, settings, meta); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return annots, errs
|
||||
@ -228,7 +281,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
}
|
||||
if settings.ConvertHistogramsToNHCB {
|
||||
ws, err := c.addCustomBucketsHistogramDataPoints(
|
||||
ctx, dataPoints, resource, settings, temporality, scope, meta,
|
||||
ctx, dataPoints, settings, temporality, meta,
|
||||
)
|
||||
annots.Merge(ws)
|
||||
if err != nil {
|
||||
@ -238,7 +291,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, scope, meta); err != nil {
|
||||
if err := c.addHistogramDataPoints(ctx, dataPoints, settings, meta); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return annots, errs
|
||||
@ -254,10 +307,8 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
ws, err := c.addExponentialHistogramDataPoints(
|
||||
ctx,
|
||||
dataPoints,
|
||||
resource,
|
||||
settings,
|
||||
temporality,
|
||||
scope,
|
||||
meta,
|
||||
)
|
||||
annots.Merge(ws)
|
||||
@ -273,7 +324,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
|
||||
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
|
||||
break
|
||||
}
|
||||
if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, scope, meta); err != nil {
|
||||
if err := c.addSummaryDataPoints(ctx, dataPoints, settings, meta); err != nil {
|
||||
errs = multierr.Append(errs, err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return annots, errs
|
||||
@ -311,8 +362,11 @@ func NewPromoteResourceAttributes(otlpCfg config.OTLPConfig) *PromoteResourceAtt
|
||||
}
|
||||
}
|
||||
|
||||
// LabelNameBuilder is a function that builds/sanitizes label names.
|
||||
type LabelNameBuilder func(string) (string, error)
|
||||
|
||||
// addPromotedAttributes adds labels for promoted resourceAttributes to the builder.
|
||||
func (s *PromoteResourceAttributes) addPromotedAttributes(builder *labels.Builder, resourceAttributes pcommon.Map, labelNamer otlptranslator.LabelNamer) error {
|
||||
func (s *PromoteResourceAttributes) addPromotedAttributes(builder *labels.Builder, resourceAttributes pcommon.Map, buildLabelName LabelNameBuilder) error {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
@ -322,13 +376,11 @@ func (s *PromoteResourceAttributes) addPromotedAttributes(builder *labels.Builde
|
||||
resourceAttributes.Range(func(name string, value pcommon.Value) bool {
|
||||
if _, exists := s.attrs[name]; !exists {
|
||||
var normalized string
|
||||
normalized, err = labelNamer.Build(name)
|
||||
normalized, err = buildLabelName(name)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if builder.Get(normalized) == "" {
|
||||
builder.Set(normalized, value.AsString())
|
||||
}
|
||||
builder.Set(normalized, value.AsString())
|
||||
}
|
||||
return true
|
||||
})
|
||||
@ -338,15 +390,91 @@ func (s *PromoteResourceAttributes) addPromotedAttributes(builder *labels.Builde
|
||||
resourceAttributes.Range(func(name string, value pcommon.Value) bool {
|
||||
if _, exists := s.attrs[name]; exists {
|
||||
var normalized string
|
||||
normalized, err = labelNamer.Build(name)
|
||||
normalized, err = buildLabelName(name)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if builder.Get(normalized) == "" {
|
||||
builder.Set(normalized, value.AsString())
|
||||
}
|
||||
builder.Set(normalized, value.AsString())
|
||||
}
|
||||
return true
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// setResourceContext precomputes and caches resource-level labels.
|
||||
// Called once per ResourceMetrics boundary, before processing any datapoints.
|
||||
// If an error is returned, resource level cache is reset.
|
||||
func (c *PrometheusConverter) setResourceContext(resource pcommon.Resource, settings Settings) error {
|
||||
resourceAttrs := resource.Attributes()
|
||||
c.resourceLabels = &cachedResourceLabels{
|
||||
externalLabels: settings.ExternalLabels,
|
||||
}
|
||||
|
||||
c.labelNamer = otlptranslator.LabelNamer{
|
||||
UTF8Allowed: settings.AllowUTF8,
|
||||
UnderscoreLabelSanitization: settings.LabelNameUnderscoreSanitization,
|
||||
PreserveMultipleUnderscores: settings.LabelNamePreserveMultipleUnderscores,
|
||||
}
|
||||
|
||||
if serviceName, ok := resourceAttrs.Get(string(semconv.ServiceNameKey)); ok {
|
||||
val := serviceName.AsString()
|
||||
if serviceNamespace, ok := resourceAttrs.Get(string(semconv.ServiceNamespaceKey)); ok {
|
||||
val = serviceNamespace.AsString() + "/" + val
|
||||
}
|
||||
c.resourceLabels.jobLabel = val
|
||||
}
|
||||
|
||||
if instance, ok := resourceAttrs.Get(string(semconv.ServiceInstanceIDKey)); ok {
|
||||
c.resourceLabels.instanceLabel = instance.AsString()
|
||||
}
|
||||
|
||||
if settings.PromoteResourceAttributes != nil {
|
||||
c.builder.Reset(labels.EmptyLabels())
|
||||
if err := settings.PromoteResourceAttributes.addPromotedAttributes(c.builder, resourceAttrs, c.buildLabelName); err != nil {
|
||||
c.clearResourceContext()
|
||||
return err
|
||||
}
|
||||
c.resourceLabels.promotedLabels = c.builder.Labels()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setScopeContext precomputes and caches scope-level labels.
|
||||
// Called once per ScopeMetrics boundary, before processing any metrics.
|
||||
// If an error is returned, scope level cache is reset.
|
||||
func (c *PrometheusConverter) setScopeContext(scope scope, settings Settings) error {
|
||||
if !settings.PromoteScopeMetadata || scope.name == "" {
|
||||
c.scopeLabels = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
c.scopeLabels = &cachedScopeLabels{
|
||||
scopeName: scope.name,
|
||||
scopeVersion: scope.version,
|
||||
scopeSchemaURL: scope.schemaURL,
|
||||
}
|
||||
c.builder.Reset(labels.EmptyLabels())
|
||||
var err error
|
||||
scope.attributes.Range(func(k string, v pcommon.Value) bool {
|
||||
var name string
|
||||
name, err = c.buildLabelName("otel_scope_" + k)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
c.builder.Set(name, v.AsString())
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
c.scopeLabels = nil
|
||||
return err
|
||||
}
|
||||
|
||||
c.scopeLabels.scopeAttrs = c.builder.Labels()
|
||||
return nil
|
||||
}
|
||||
|
||||
// clearResourceContext clears cached labels between ResourceMetrics.
|
||||
func (c *PrometheusConverter) clearResourceContext() {
|
||||
c.resourceLabels = nil
|
||||
c.scopeLabels = nil
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
@ -456,6 +457,211 @@ func TestFromMetrics(t *testing.T) {
|
||||
},
|
||||
}, targetInfoSamples)
|
||||
})
|
||||
|
||||
t.Run("target_info should not include scope labels when PromoteScopeMetadata is enabled", func(t *testing.T) {
|
||||
// Regression test: When PromoteScopeMetadata is enabled and a scope has a non-empty name,
|
||||
// the cached scopeLabels should NOT be merged into target_info.
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
|
||||
// Set up resource attributes for job/instance labels.
|
||||
rm.Resource().Attributes().PutStr("service.name", "test-service")
|
||||
rm.Resource().Attributes().PutStr("service.instance.id", "instance-1")
|
||||
generateAttributes(rm.Resource().Attributes(), "resource", 2)
|
||||
|
||||
// Create a scope with a non-empty name (this triggers scope label caching).
|
||||
scopeMetrics := rm.ScopeMetrics().AppendEmpty()
|
||||
scope := scopeMetrics.Scope()
|
||||
scope.SetName("my-scope")
|
||||
scope.SetVersion("1.0.0")
|
||||
scope.Attributes().PutStr("scope-attr", "scope-value")
|
||||
|
||||
// Add a metric.
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
m := scopeMetrics.Metrics().AppendEmpty()
|
||||
m.SetEmptyGauge()
|
||||
m.SetName("test_gauge")
|
||||
m.SetDescription("test gauge")
|
||||
point := m.Gauge().DataPoints().AppendEmpty()
|
||||
point.SetTimestamp(ts)
|
||||
point.SetDoubleValue(1.0)
|
||||
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
annots, err := converter.FromMetrics(
|
||||
context.Background(),
|
||||
request.Metrics(),
|
||||
Settings{
|
||||
PromoteScopeMetadata: true,
|
||||
LookbackDelta: defaultLookbackDelta,
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, annots)
|
||||
require.NoError(t, mockAppender.Commit())
|
||||
|
||||
// Find target_info samples.
|
||||
var targetInfoSamples []combinedSample
|
||||
for _, s := range mockAppender.samples {
|
||||
if s.ls.Get(labels.MetricName) == "target_info" {
|
||||
targetInfoSamples = append(targetInfoSamples, s)
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, targetInfoSamples, "expected target_info samples")
|
||||
|
||||
// Verify target_info does NOT have scope labels.
|
||||
for _, s := range targetInfoSamples {
|
||||
require.Empty(t, s.ls.Get("otel_scope_name"), "target_info should not have otel_scope_name")
|
||||
require.Empty(t, s.ls.Get("otel_scope_version"), "target_info should not have otel_scope_version")
|
||||
require.Empty(t, s.ls.Get("otel_scope_schema_url"), "target_info should not have otel_scope_schema_url")
|
||||
require.Empty(t, s.ls.Get("otel_scope_scope_attr"), "target_info should not have scope attributes")
|
||||
}
|
||||
|
||||
// Verify the metric itself DOES have scope labels.
|
||||
var metricSamples []combinedSample
|
||||
for _, s := range mockAppender.samples {
|
||||
if s.ls.Get(labels.MetricName) == "test_gauge" {
|
||||
metricSamples = append(metricSamples, s)
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, metricSamples, "expected metric samples")
|
||||
require.Equal(t, "my-scope", metricSamples[0].ls.Get("otel_scope_name"), "metric should have otel_scope_name")
|
||||
require.Equal(t, "1.0.0", metricSamples[0].ls.Get("otel_scope_version"), "metric should have otel_scope_version")
|
||||
})
|
||||
|
||||
t.Run("target_info should include promoted resource attributes", func(t *testing.T) {
|
||||
// Promoted resource attributes should appear on both metrics and target_info.
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
|
||||
// Set up resource attributes.
|
||||
rm.Resource().Attributes().PutStr("service.name", "test-service")
|
||||
rm.Resource().Attributes().PutStr("service.instance.id", "instance-1")
|
||||
rm.Resource().Attributes().PutStr("custom.promoted.attr", "promoted-value")
|
||||
rm.Resource().Attributes().PutStr("another.resource.attr", "another-value")
|
||||
|
||||
// Add a metric.
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
scopeMetrics := rm.ScopeMetrics().AppendEmpty()
|
||||
m := scopeMetrics.Metrics().AppendEmpty()
|
||||
m.SetEmptyGauge()
|
||||
m.SetName("test_gauge")
|
||||
m.SetDescription("test gauge")
|
||||
point := m.Gauge().DataPoints().AppendEmpty()
|
||||
point.SetTimestamp(ts)
|
||||
point.SetDoubleValue(1.0)
|
||||
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
annots, err := converter.FromMetrics(
|
||||
context.Background(),
|
||||
request.Metrics(),
|
||||
Settings{
|
||||
PromoteResourceAttributes: NewPromoteResourceAttributes(config.OTLPConfig{
|
||||
PromoteResourceAttributes: []string{"custom.promoted.attr"},
|
||||
}),
|
||||
LookbackDelta: defaultLookbackDelta,
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, annots)
|
||||
require.NoError(t, mockAppender.Commit())
|
||||
|
||||
// Find target_info samples.
|
||||
var targetInfoSamples []combinedSample
|
||||
for _, s := range mockAppender.samples {
|
||||
if s.ls.Get(labels.MetricName) == "target_info" {
|
||||
targetInfoSamples = append(targetInfoSamples, s)
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, targetInfoSamples, "expected target_info samples")
|
||||
|
||||
// Verify target_info has the promoted resource attribute.
|
||||
for _, s := range targetInfoSamples {
|
||||
require.Equal(t, "promoted-value", s.ls.Get("custom_promoted_attr"), "target_info should have promoted resource attributes")
|
||||
require.Equal(t, "another-value", s.ls.Get("another_resource_attr"), "target_info should have non-promoted resource attributes")
|
||||
}
|
||||
|
||||
// Verify the metric also has the promoted resource attribute.
|
||||
var metricSamples []combinedSample
|
||||
for _, s := range mockAppender.samples {
|
||||
if s.ls.Get(labels.MetricName) == "test_gauge" {
|
||||
metricSamples = append(metricSamples, s)
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, metricSamples, "expected metric samples")
|
||||
require.Equal(t, "promoted-value", metricSamples[0].ls.Get("custom_promoted_attr"), "metric should have promoted resource attribute")
|
||||
})
|
||||
|
||||
t.Run("target_info should include promoted attributes when KeepIdentifyingResourceAttributes is enabled", func(t *testing.T) {
|
||||
// When both PromoteResourceAttributes and KeepIdentifyingResourceAttributes are configured,
|
||||
// target_info should include both the promoted attributes and the identifying attributes.
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
|
||||
rm.Resource().Attributes().PutStr("service.name", "test-service")
|
||||
rm.Resource().Attributes().PutStr("service.namespace", "test-namespace")
|
||||
rm.Resource().Attributes().PutStr("service.instance.id", "instance-1")
|
||||
rm.Resource().Attributes().PutStr("custom.promoted.attr", "promoted-value")
|
||||
rm.Resource().Attributes().PutStr("another.resource.attr", "another-value")
|
||||
|
||||
// Add a metric.
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
scopeMetrics := rm.ScopeMetrics().AppendEmpty()
|
||||
m := scopeMetrics.Metrics().AppendEmpty()
|
||||
m.SetEmptyGauge()
|
||||
m.SetName("test_gauge")
|
||||
m.SetDescription("test gauge")
|
||||
point := m.Gauge().DataPoints().AppendEmpty()
|
||||
point.SetTimestamp(ts)
|
||||
point.SetDoubleValue(1.0)
|
||||
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
annots, err := converter.FromMetrics(
|
||||
context.Background(),
|
||||
request.Metrics(),
|
||||
Settings{
|
||||
PromoteResourceAttributes: NewPromoteResourceAttributes(config.OTLPConfig{
|
||||
PromoteResourceAttributes: []string{"custom.promoted.attr"},
|
||||
}),
|
||||
KeepIdentifyingResourceAttributes: true,
|
||||
LookbackDelta: defaultLookbackDelta,
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, annots)
|
||||
require.NoError(t, mockAppender.Commit())
|
||||
|
||||
var targetInfoSamples []combinedSample
|
||||
for _, s := range mockAppender.samples {
|
||||
if s.ls.Get(labels.MetricName) == "target_info" {
|
||||
targetInfoSamples = append(targetInfoSamples, s)
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, targetInfoSamples, "expected target_info samples")
|
||||
|
||||
// Verify target_info has the promoted resource attribute.
|
||||
for _, s := range targetInfoSamples {
|
||||
require.Equal(t, "promoted-value", s.ls.Get("custom_promoted_attr"), "target_info should have promoted resource attributes")
|
||||
// And it should have the identifying attributes (since KeepIdentifyingResourceAttributes is true).
|
||||
require.Equal(t, "test-service", s.ls.Get("service_name"), "target_info should have service.name when KeepIdentifyingResourceAttributes is true")
|
||||
require.Equal(t, "test-namespace", s.ls.Get("service_namespace"), "target_info should have service.namespace when KeepIdentifyingResourceAttributes is true")
|
||||
require.Equal(t, "instance-1", s.ls.Get("service_instance_id"), "target_info should have service.instance.id when KeepIdentifyingResourceAttributes is true")
|
||||
// And the non-promoted resource attribute.
|
||||
require.Equal(t, "another-value", s.ls.Get("another_resource_attr"), "target_info should have non-promoted resource attributes")
|
||||
}
|
||||
|
||||
// Verify the metric also has the promoted resource attribute.
|
||||
var metricSamples []combinedSample
|
||||
for _, s := range mockAppender.samples {
|
||||
if s.ls.Get(labels.MetricName) == "test_gauge" {
|
||||
metricSamples = append(metricSamples, s)
|
||||
}
|
||||
}
|
||||
require.NotEmpty(t, metricSamples, "expected metric samples")
|
||||
require.Equal(t, "promoted-value", metricSamples[0].ls.Get("custom_promoted_attr"), "metric should have promoted resource attribute")
|
||||
})
|
||||
}
|
||||
|
||||
func TestTemporality(t *testing.T) {
|
||||
@ -1323,3 +1529,276 @@ func generateExemplars(exemplars pmetric.ExemplarSlice, count int, ts pcommon.Ti
|
||||
e.SetTraceID(pcommon.TraceID{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f})
|
||||
}
|
||||
}
|
||||
|
||||
// createMultiScopeExportRequest creates an export request with multiple scopes per resource.
|
||||
// This is useful for benchmarking resource-level label caching, where cached resource labels
|
||||
// (job, instance, promoted attributes) should be computed once and reused across all scopes.
|
||||
func createMultiScopeExportRequest(
|
||||
resourceAttributeCount int,
|
||||
scopeCount int,
|
||||
metricsPerScope int,
|
||||
labelsPerMetric int,
|
||||
scopeAttributeCount int,
|
||||
) pmetricotlp.ExportRequest {
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
generateAttributes(rm.Resource().Attributes(), "resource", resourceAttributeCount)
|
||||
|
||||
// Set service attributes for job/instance label generation
|
||||
rm.Resource().Attributes().PutStr("service.name", "test-service")
|
||||
rm.Resource().Attributes().PutStr("service.namespace", "test-namespace")
|
||||
rm.Resource().Attributes().PutStr("service.instance.id", "instance-1")
|
||||
|
||||
for s := range scopeCount {
|
||||
scopeMetrics := rm.ScopeMetrics().AppendEmpty()
|
||||
scope := scopeMetrics.Scope()
|
||||
scope.SetName(fmt.Sprintf("scope-%d", s))
|
||||
scope.SetVersion("1.0.0")
|
||||
generateAttributes(scope.Attributes(), "scope", scopeAttributeCount)
|
||||
|
||||
metrics := scopeMetrics.Metrics()
|
||||
for m := range metricsPerScope {
|
||||
metric := metrics.AppendEmpty()
|
||||
metric.SetName(fmt.Sprintf("gauge_s%d_m%d", s, m))
|
||||
metric.SetDescription("gauge metric")
|
||||
metric.SetUnit("unit")
|
||||
point := metric.SetEmptyGauge().DataPoints().AppendEmpty()
|
||||
point.SetTimestamp(ts)
|
||||
point.SetDoubleValue(float64(m))
|
||||
generateAttributes(point.Attributes(), "series", labelsPerMetric)
|
||||
}
|
||||
}
|
||||
|
||||
return request
|
||||
}
|
||||
|
||||
// createRepeatedLabelsExportRequest creates an export request where the same label names
|
||||
// appear repeatedly across many datapoints. This is useful for benchmarking the label
|
||||
// sanitization cache, which should reduce allocations when the same label names are seen multiple times.
|
||||
func createRepeatedLabelsExportRequest(
|
||||
uniqueLabelNames int,
|
||||
datapointCount int,
|
||||
labelsPerDatapoint int,
|
||||
) pmetricotlp.ExportRequest {
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
rm.Resource().Attributes().PutStr("service.name", "test-service")
|
||||
rm.Resource().Attributes().PutStr("service.instance.id", "instance-1")
|
||||
|
||||
metrics := rm.ScopeMetrics().AppendEmpty().Metrics()
|
||||
|
||||
// Pre-generate label names that will be reused.
|
||||
labelNames := make([]string, uniqueLabelNames)
|
||||
for i := range uniqueLabelNames {
|
||||
labelNames[i] = fmt.Sprintf("label.name.%d", i)
|
||||
}
|
||||
|
||||
for d := range datapointCount {
|
||||
metric := metrics.AppendEmpty()
|
||||
metric.SetName(fmt.Sprintf("gauge_%d", d))
|
||||
metric.SetDescription("gauge metric")
|
||||
metric.SetUnit("unit")
|
||||
point := metric.SetEmptyGauge().DataPoints().AppendEmpty()
|
||||
point.SetTimestamp(ts)
|
||||
point.SetDoubleValue(float64(d))
|
||||
|
||||
// Add labels using the same label names (cycling through them).
|
||||
for l := range labelsPerDatapoint {
|
||||
labelName := labelNames[l%uniqueLabelNames]
|
||||
point.Attributes().PutStr(labelName, fmt.Sprintf("value-%d-%d", d, l))
|
||||
}
|
||||
}
|
||||
|
||||
return request
|
||||
}
|
||||
|
||||
// createMultiResourceExportRequest creates an export request with multiple ResourceMetrics.
|
||||
// This is useful for benchmarking the overhead of cache clearing between resources and
|
||||
// verifying that caching still helps within each resource.
|
||||
func createMultiResourceExportRequest(
|
||||
resourceCount int,
|
||||
resourceAttributeCount int,
|
||||
metricsPerResource int,
|
||||
labelsPerMetric int,
|
||||
) pmetricotlp.ExportRequest {
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
ts := pcommon.NewTimestampFromTime(time.Now())
|
||||
|
||||
for r := range resourceCount {
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
generateAttributes(rm.Resource().Attributes(), "resource", resourceAttributeCount)
|
||||
|
||||
// Set unique service attributes per resource for job/instance label generation.
|
||||
rm.Resource().Attributes().PutStr("service.name", fmt.Sprintf("service-%d", r))
|
||||
rm.Resource().Attributes().PutStr("service.namespace", "test-namespace")
|
||||
rm.Resource().Attributes().PutStr("service.instance.id", fmt.Sprintf("instance-%d", r))
|
||||
|
||||
metrics := rm.ScopeMetrics().AppendEmpty().Metrics()
|
||||
for m := range metricsPerResource {
|
||||
metric := metrics.AppendEmpty()
|
||||
metric.SetName(fmt.Sprintf("gauge_r%d_m%d", r, m))
|
||||
metric.SetDescription("gauge metric")
|
||||
metric.SetUnit("unit")
|
||||
point := metric.SetEmptyGauge().DataPoints().AppendEmpty()
|
||||
point.SetTimestamp(ts)
|
||||
point.SetDoubleValue(float64(m))
|
||||
generateAttributes(point.Attributes(), "series", labelsPerMetric)
|
||||
}
|
||||
}
|
||||
|
||||
return request
|
||||
}
|
||||
|
||||
// BenchmarkFromMetrics_LabelCaching_MultipleDatapointsPerResource benchmarks the resource-level
|
||||
// label caching optimization. With caching, resource labels (job, instance, promoted
|
||||
// attributes) should be computed once per ResourceMetrics and reused for all datapoints.
|
||||
func BenchmarkFromMetrics_LabelCaching_MultipleDatapointsPerResource(b *testing.B) {
|
||||
const (
|
||||
labelsPerMetric = 5
|
||||
scopeAttributeCount = 3
|
||||
)
|
||||
for _, resourceAttrs := range []int{5, 50} {
|
||||
for _, scopeCount := range []int{1, 10} {
|
||||
for _, metricsPerScope := range []int{10, 100} {
|
||||
b.Run(fmt.Sprintf("res_attrs=%d/scopes=%d/metrics=%d", resourceAttrs, scopeCount, metricsPerScope), func(b *testing.B) {
|
||||
settings := Settings{
|
||||
PromoteResourceAttributes: NewPromoteResourceAttributes(config.OTLPConfig{
|
||||
PromoteAllResourceAttributes: true,
|
||||
}),
|
||||
}
|
||||
payload := createMultiScopeExportRequest(
|
||||
resourceAttrs,
|
||||
scopeCount,
|
||||
metricsPerScope,
|
||||
labelsPerMetric,
|
||||
scopeAttributeCount,
|
||||
)
|
||||
appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry())
|
||||
noOpLogger := promslog.NewNopLogger()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for b.Loop() {
|
||||
app := &noOpAppender{}
|
||||
mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics)
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
_, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkFromMetrics_LabelCaching_RepeatedLabelNames benchmarks the label sanitization cache.
|
||||
// When the same label names appear across many datapoints, the sanitization should
|
||||
// only happen once per unique label name within a ResourceMetrics.
|
||||
func BenchmarkFromMetrics_LabelCaching_RepeatedLabelNames(b *testing.B) {
|
||||
const labelsPerDatapoint = 20
|
||||
for _, uniqueLabels := range []int{5, 50} {
|
||||
for _, datapoints := range []int{100, 1000} {
|
||||
b.Run(fmt.Sprintf("unique_labels=%d/datapoints=%d", uniqueLabels, datapoints), func(b *testing.B) {
|
||||
settings := Settings{}
|
||||
payload := createRepeatedLabelsExportRequest(
|
||||
uniqueLabels,
|
||||
datapoints,
|
||||
labelsPerDatapoint,
|
||||
)
|
||||
appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry())
|
||||
noOpLogger := promslog.NewNopLogger()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for b.Loop() {
|
||||
app := &noOpAppender{}
|
||||
mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics)
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
_, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkFromMetrics_LabelCaching_ScopeMetadata benchmarks scope-level label caching when
|
||||
// PromoteScopeMetadata is enabled. Scope metadata labels (otel_scope_name, version, etc.)
|
||||
// should be computed once per ScopeMetrics and reused for all metrics within that scope.
|
||||
func BenchmarkFromMetrics_LabelCaching_ScopeMetadata(b *testing.B) {
|
||||
const (
|
||||
resourceAttributeCount = 5
|
||||
labelsPerMetric = 5
|
||||
)
|
||||
for _, scopeAttrs := range []int{0, 10} {
|
||||
for _, metricsPerScope := range []int{10, 100} {
|
||||
b.Run(fmt.Sprintf("scope_attrs=%d/metrics=%d", scopeAttrs, metricsPerScope), func(b *testing.B) {
|
||||
settings := Settings{
|
||||
PromoteScopeMetadata: true,
|
||||
}
|
||||
payload := createMultiScopeExportRequest(
|
||||
resourceAttributeCount,
|
||||
1, // single scope to isolate scope caching benefit
|
||||
metricsPerScope,
|
||||
labelsPerMetric,
|
||||
scopeAttrs,
|
||||
)
|
||||
appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry())
|
||||
noOpLogger := promslog.NewNopLogger()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for b.Loop() {
|
||||
app := &noOpAppender{}
|
||||
mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics)
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
_, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkFromMetrics_LabelCaching_MultipleResources benchmarks requests with multiple
|
||||
// ResourceMetrics. The label sanitization cache is cleared between resources, so this
|
||||
// measures the overhead of cache clearing and verifies caching helps within each resource.
|
||||
func BenchmarkFromMetrics_LabelCaching_MultipleResources(b *testing.B) {
|
||||
const (
|
||||
resourceAttributeCount = 10
|
||||
labelsPerMetric = 10
|
||||
)
|
||||
for _, resourceCount := range []int{1, 10, 50} {
|
||||
for _, metricsPerResource := range []int{10, 100} {
|
||||
b.Run(fmt.Sprintf("resources=%d/metrics=%d", resourceCount, metricsPerResource), func(b *testing.B) {
|
||||
settings := Settings{
|
||||
PromoteResourceAttributes: NewPromoteResourceAttributes(config.OTLPConfig{
|
||||
PromoteAllResourceAttributes: true,
|
||||
}),
|
||||
}
|
||||
payload := createMultiResourceExportRequest(
|
||||
resourceCount,
|
||||
resourceAttributeCount,
|
||||
metricsPerResource,
|
||||
labelsPerMetric,
|
||||
)
|
||||
appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry())
|
||||
noOpLogger := promslog.NewNopLogger()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for b.Loop() {
|
||||
app := &noOpAppender{}
|
||||
mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics)
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
_, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,14 +21,13 @@ import (
|
||||
"math"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"go.opentelemetry.io/collector/pdata/pcommon"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
)
|
||||
|
||||
func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, scope scope, meta Metadata,
|
||||
settings Settings, meta Metadata,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -37,9 +36,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
|
||||
|
||||
pt := dataPoints.At(x)
|
||||
labels, err := c.createAttributes(
|
||||
resource,
|
||||
pt.Attributes(),
|
||||
scope,
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
@ -71,7 +68,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
|
||||
}
|
||||
|
||||
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
|
||||
resource pcommon.Resource, settings Settings, scope scope, meta Metadata,
|
||||
settings Settings, meta Metadata,
|
||||
) error {
|
||||
for x := 0; x < dataPoints.Len(); x++ {
|
||||
if err := c.everyN.checkContext(ctx); err != nil {
|
||||
@ -80,9 +77,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
|
||||
|
||||
pt := dataPoints.At(x)
|
||||
lbls, err := c.createAttributes(
|
||||
resource,
|
||||
pt.Attributes(),
|
||||
scope,
|
||||
settings,
|
||||
nil,
|
||||
true,
|
||||
|
||||
@ -114,15 +114,19 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) {
|
||||
metric := tt.metric()
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
settings := Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
}
|
||||
resource := pcommon.NewResource()
|
||||
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, converter.setResourceContext(resource, settings))
|
||||
require.NoError(t, converter.setScopeContext(tt.scope, settings))
|
||||
|
||||
converter.addGaugeNumberDataPoints(
|
||||
context.Background(),
|
||||
metric.Gauge().DataPoints(),
|
||||
pcommon.NewResource(),
|
||||
Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
tt.scope,
|
||||
settings,
|
||||
Metadata{
|
||||
MetricFamilyName: metric.Name(),
|
||||
},
|
||||
@ -344,15 +348,19 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
|
||||
metric := tt.metric()
|
||||
mockAppender := &mockCombinedAppender{}
|
||||
converter := NewPrometheusConverter(mockAppender)
|
||||
settings := Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
}
|
||||
resource := pcommon.NewResource()
|
||||
|
||||
// Initialize resource and scope context as FromMetrics would.
|
||||
require.NoError(t, converter.setResourceContext(resource, settings))
|
||||
require.NoError(t, converter.setScopeContext(tt.scope, settings))
|
||||
|
||||
converter.addSumNumberDataPoints(
|
||||
context.Background(),
|
||||
metric.Sum().DataPoints(),
|
||||
pcommon.NewResource(),
|
||||
Settings{
|
||||
PromoteScopeMetadata: tt.promoteScope,
|
||||
},
|
||||
tt.scope,
|
||||
settings,
|
||||
Metadata{
|
||||
MetricFamilyName: metric.Name(),
|
||||
},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user