OTLP receiver: Generate target_info samples between the earliest and latest samples per resource (#16737)

* OTLP receiver: Generate target_info samples between the earliest and latest samples per resource

Modify the OTLP receiver to generate target_info samples between the earliest
and latest samples per resource instead of only one for the latest timestamp.
The samples are spaced lookback delta/2 apart.

---------

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2025-07-04 16:38:16 +02:00 committed by GitHub
parent 819500bdbc
commit f561aa795d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 169 additions and 25 deletions

View File

@ -3,6 +3,7 @@
## main / unreleased
* [FEATURE] OTLP receiver: Support promoting OTel scope name/version/schema URL/attributes as metric labels, enable via configuration parameter `otlp.promote_scope_metadata`. #16730 #16760
* [BUGFIX] OTLP receiver: Generate `target_info` samples between the earliest and latest samples per resource. #16737
## 3.4.2 / 2025-06-26

View File

@ -25,6 +25,7 @@ import (
"slices"
"sort"
"strconv"
"time"
"unicode/utf8"
"github.com/cespare/xxhash/v2"
@ -53,10 +54,11 @@ const (
maxExemplarRunes = 128
// Trace and Span id keys are defined as part of the spec:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2
traceIDKey = "trace_id"
spanIDKey = "span_id"
infoType = "info"
targetMetricName = "target_info"
traceIDKey = "trace_id"
spanIDKey = "span_id"
infoType = "info"
targetMetricName = "target_info"
defaultLookbackDelta = 5 * time.Minute
)
type bucketBoundsData struct {
@ -416,39 +418,49 @@ func getPromExemplars[T exemplarType](ctx context.Context, everyN *everyNTimes,
return promExemplars, nil
}
// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics.
func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp {
var ts pcommon.Timestamp
// findMinAndMaxTimestamps returns the minimum of minTimestamp and the earliest timestamp in metric and
// the maximum of maxTimestamp and the latest timestamp in metric, respectively.
func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp pcommon.Timestamp) (pcommon.Timestamp, pcommon.Timestamp) {
// handle individual metric based on type
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = max(ts, dataPoints.At(x).Timestamp())
ts := dataPoints.At(x).Timestamp()
minTimestamp = min(minTimestamp, ts)
maxTimestamp = max(maxTimestamp, ts)
}
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = max(ts, dataPoints.At(x).Timestamp())
ts := dataPoints.At(x).Timestamp()
minTimestamp = min(minTimestamp, ts)
maxTimestamp = max(maxTimestamp, ts)
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = max(ts, dataPoints.At(x).Timestamp())
ts := dataPoints.At(x).Timestamp()
minTimestamp = min(minTimestamp, ts)
maxTimestamp = max(maxTimestamp, ts)
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = max(ts, dataPoints.At(x).Timestamp())
ts := dataPoints.At(x).Timestamp()
minTimestamp = min(minTimestamp, ts)
maxTimestamp = max(maxTimestamp, ts)
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = max(ts, dataPoints.At(x).Timestamp())
ts := dataPoints.At(x).Timestamp()
minTimestamp = min(minTimestamp, ts)
maxTimestamp = max(maxTimestamp, ts)
}
}
return ts
return minTimestamp, maxTimestamp
}
func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
@ -581,8 +593,8 @@ func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi
}
// addResourceTargetInfo converts the resource to the target info metric.
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) {
if settings.DisableTargetInfo || timestamp == 0 {
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earliestTimestamp, latestTimestamp time.Time, converter *PrometheusConverter) {
if settings.DisableTargetInfo {
return
}
@ -628,12 +640,27 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
return
}
sample := &prompb.Sample{
Value: float64(1),
// convert ns to ms
Timestamp: convertTimeStamp(timestamp),
// Generate target_info samples starting at earliestTimestamp and ending at latestTimestamp,
// with a sample at every interval between them.
// Use an interval corresponding to half of the lookback delta, to ensure that target_info samples are found
// for the entirety of the relevant period.
if settings.LookbackDelta == 0 {
settings.LookbackDelta = defaultLookbackDelta
}
interval := settings.LookbackDelta / 2
ts, _ := converter.getOrCreateTimeSeries(labels)
for timestamp := earliestTimestamp; timestamp.Before(latestTimestamp); timestamp = timestamp.Add(interval) {
ts.Samples = append(ts.Samples, prompb.Sample{
Value: float64(1),
Timestamp: timestamp.UnixMilli(),
})
}
if len(ts.Samples) == 0 || ts.Samples[len(ts.Samples)-1].Timestamp < latestTimestamp.UnixMilli() {
ts.Samples = append(ts.Samples, prompb.Sample{
Value: float64(1),
Timestamp: latestTimestamp.UnixMilli(),
})
}
converter.addSample(sample, labels)
}
// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms.

View File

@ -20,7 +20,9 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"time"
"github.com/prometheus/otlptranslator"
"go.opentelemetry.io/collector/pdata/pcommon"
@ -50,6 +52,8 @@ type Settings struct {
AllowDeltaTemporality bool
// PromoteScopeMetadata controls whether to promote OTel scope metadata to metric labels.
PromoteScopeMetadata bool
// LookbackDelta is the PromQL engine lookback delta.
LookbackDelta time.Duration
}
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
@ -132,9 +136,10 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// keep track of the earliest and latest timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
earliestTimestamp := pcommon.Timestamp(math.MaxUint64)
latestTimestamp := pcommon.Timestamp(0)
for j := 0; j < scopeMetricsSlice.Len(); j++ {
scopeMetrics := scopeMetricsSlice.At(j)
scope := newScopeFromScopeMetrics(scopeMetrics)
@ -148,7 +153,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
}
metric := metricSlice.At(k)
mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
earliestTimestamp, latestTimestamp = findMinAndMaxTimestamps(metric, earliestTimestamp, latestTimestamp)
temporality, hasTemporality, err := aggregationTemporality(metric)
if err != nil {
errs = multierr.Append(errs, err)
@ -264,7 +269,11 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
}
}
}
addResourceTargetInfo(resource, settings, mostRecentTimestamp, c)
if earliestTimestamp < pcommon.Timestamp(math.MaxUint64) {
// We have at least one metric sample for this resource.
// Generate a corresponding target_info series.
addResourceTargetInfo(resource, settings, earliestTimestamp.AsTime(), latestTimestamp.AsTime(), c)
}
}
return annots, errs

View File

@ -275,6 +275,100 @@ func TestFromMetrics(t *testing.T) {
"histogram data point has zero count, but non-zero sum: 155.000000",
}, ws)
})
t.Run("target_info's samples starts at the earliest metric sample timestamp and ends at the latest sample timestamp of the corresponding resource, with one sample every lookback delta/2 timestamps between", func(t *testing.T) {
request := pmetricotlp.NewExportRequest()
rm := request.Metrics().ResourceMetrics().AppendEmpty()
generateAttributes(rm.Resource().Attributes(), "resource", 5)
// Fake some resource attributes.
for k, v := range map[string]string{
"service.name": "test-service",
"service.namespace": "test-namespace",
"service.instance.id": "id1234",
} {
rm.Resource().Attributes().PutStr(k, v)
}
metrics := rm.ScopeMetrics().AppendEmpty().Metrics()
ts := pcommon.NewTimestampFromTime(time.Now())
var expMetadata []prompb.MetricMetadata
for i := range 3 {
m := metrics.AppendEmpty()
m.SetEmptyGauge()
m.SetName(fmt.Sprintf("gauge-%v", i+1))
m.SetDescription("gauge")
m.SetUnit("unit")
// Add samples every lookback delta / 4 timestamps.
curTs := ts.AsTime()
for range 6 {
point := m.Gauge().DataPoints().AppendEmpty()
point.SetTimestamp(pcommon.NewTimestampFromTime(curTs))
point.SetDoubleValue(1.23)
generateAttributes(point.Attributes(), "series", 2)
curTs = curTs.Add(defaultLookbackDelta / 4)
}
namer := otlptranslator.MetricNamer{}
expMetadata = append(expMetadata, prompb.MetricMetadata{
Type: otelMetricTypeToPromMetricType(m),
MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(m)),
Help: m.Description(),
Unit: m.Unit(),
})
}
converter := NewPrometheusConverter()
annots, err := converter.FromMetrics(
context.Background(),
request.Metrics(),
Settings{
LookbackDelta: defaultLookbackDelta,
},
)
require.NoError(t, err)
require.Empty(t, annots)
testutil.RequireEqual(t, expMetadata, converter.Metadata())
timeSeries := converter.TimeSeries()
tgtInfoCount := 0
for _, s := range timeSeries {
b := labels.NewScratchBuilder(2)
lbls := s.ToLabels(&b, nil)
if lbls.Get(labels.MetricName) != "target_info" {
continue
}
tgtInfoCount++
require.Equal(t, "test-namespace/test-service", lbls.Get("job"))
require.Equal(t, "id1234", lbls.Get("instance"))
require.False(t, lbls.Has("service_name"))
require.False(t, lbls.Has("service_namespace"))
require.False(t, lbls.Has("service_instance_id"))
// There should be a target_info sample at the earliest metric timestamp, then two spaced lookback delta/2 apart,
// then one at the latest metric timestamp.
testutil.RequireEqual(t, []prompb.Sample{
{
Value: 1,
Timestamp: ts.AsTime().UnixMilli(),
},
{
Value: 1,
Timestamp: ts.AsTime().Add(defaultLookbackDelta / 2).UnixMilli(),
},
{
Value: 1,
Timestamp: ts.AsTime().Add(defaultLookbackDelta).UnixMilli(),
},
{
Value: 1,
Timestamp: ts.AsTime().Add(defaultLookbackDelta + defaultLookbackDelta/4).UnixMilli(),
},
}, s.Samples)
}
require.Equal(t, 1, tgtInfoCount)
})
}
func TestTemporality(t *testing.T) {

View File

@ -530,6 +530,9 @@ type OTLPOptions struct {
// marking the metric type as unknown for now).
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
NativeDelta bool
// LookbackDelta is the query lookback delta.
// Used to calculate the target_info sample timestamp interval.
LookbackDelta time.Duration
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
@ -547,6 +550,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
},
config: configFunc,
allowDeltaTemporality: opts.NativeDelta,
lookbackDelta: opts.LookbackDelta,
}
wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex}
@ -583,6 +587,7 @@ type rwExporter struct {
*writeHandler
config func() config.Config
allowDeltaTemporality bool
lookbackDelta time.Duration
}
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
@ -597,6 +602,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er
ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB,
AllowDeltaTemporality: rw.allowDeltaTemporality,
PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata,
LookbackDelta: rw.lookbackDelta,
})
if err != nil {
rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)

View File

@ -264,6 +264,7 @@ func NewAPI(
acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
otlpEnabled, otlpDeltaToCumulative, otlpNativeDeltaIngestion bool,
ctZeroIngestionEnabled bool,
lookbackDelta time.Duration,
) *API {
a := &API{
QueryEngine: qe,
@ -310,7 +311,11 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative, NativeDelta: otlpNativeDeltaIngestion})
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{
ConvertDelta: otlpDeltaToCumulative,
NativeDelta: otlpNativeDeltaIngestion,
LookbackDelta: lookbackDelta,
})
}
return a

View File

@ -145,6 +145,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
false,
false,
false,
5*time.Minute,
)
promRouter := route.New().WithPrefix("/api/v1")

View File

@ -392,6 +392,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
o.ConvertOTLPDelta,
o.NativeOTLPDeltaIngestion,
o.CTZeroIngestionEnabled,
o.LookbackDelta,
)
if o.RoutePrefix != "/" {