From f561aa795d1defeeb876bac6d0a6a8970568ccdc Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 4 Jul 2025 16:38:16 +0200 Subject: [PATCH] OTLP receiver: Generate `target_info` samples between the earliest and latest samples per resource (#16737) * OTLP receiver: Generate target_info samples between the earliest and latest samples per resource Modify the OTLP receiver to generate target_info samples between the earliest and latest samples per resource instead of only one for the latest timestamp. The samples are spaced lookback delta/2 apart. --------- Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + .../prometheusremotewrite/helper.go | 67 +++++++++---- .../prometheusremotewrite/metrics_to_prw.go | 17 +++- .../metrics_to_prw_test.go | 94 +++++++++++++++++++ storage/remote/write_handler.go | 6 ++ web/api/v1/api.go | 7 +- web/api/v1/errors_test.go | 1 + web/web.go | 1 + 8 files changed, 169 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 793a625bd4..7f171b78f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## main / unreleased * [FEATURE] OTLP receiver: Support promoting OTel scope name/version/schema URL/attributes as metric labels, enable via configuration parameter `otlp.promote_scope_metadata`. #16730 #16760 +* [BUGFIX] OTLP receiver: Generate `target_info` samples between the earliest and latest samples per resource. #16737 ## 3.4.2 / 2025-06-26 diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 111b6212fb..e6053df134 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -25,6 +25,7 @@ import ( "slices" "sort" "strconv" + "time" "unicode/utf8" "github.com/cespare/xxhash/v2" @@ -53,10 +54,11 @@ const ( maxExemplarRunes = 128 // Trace and Span id keys are defined as part of the spec: // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2 - traceIDKey = "trace_id" - spanIDKey = "span_id" - infoType = "info" - targetMetricName = "target_info" + traceIDKey = "trace_id" + spanIDKey = "span_id" + infoType = "info" + targetMetricName = "target_info" + defaultLookbackDelta = 5 * time.Minute ) type bucketBoundsData struct { @@ -416,39 +418,49 @@ func getPromExemplars[T exemplarType](ctx context.Context, everyN *everyNTimes, return promExemplars, nil } -// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics. -func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { - var ts pcommon.Timestamp +// findMinAndMaxTimestamps returns the minimum of minTimestamp and the earliest timestamp in metric and +// the maximum of maxTimestamp and the latest timestamp in metric, respectively. +func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp pcommon.Timestamp) (pcommon.Timestamp, pcommon.Timestamp) { // handle individual metric based on type //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() for x := 0; x < dataPoints.Len(); x++ { - ts = max(ts, dataPoints.At(x).Timestamp()) + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() for x := 0; x < dataPoints.Len(); x++ { - ts = max(ts, dataPoints.At(x).Timestamp()) + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() for x := 0; x < dataPoints.Len(); x++ { - ts = max(ts, dataPoints.At(x).Timestamp()) + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) } case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() for x := 0; x < dataPoints.Len(); x++ { - ts = max(ts, dataPoints.At(x).Timestamp()) + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) } case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() for x := 0; x < dataPoints.Len(); x++ { - ts = max(ts, dataPoints.At(x).Timestamp()) + ts := dataPoints.At(x).Timestamp() + minTimestamp = min(minTimestamp, ts) + maxTimestamp = max(maxTimestamp, ts) } } - return ts + return minTimestamp, maxTimestamp } func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, @@ -581,8 +593,8 @@ func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi } // addResourceTargetInfo converts the resource to the target info metric. -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) { - if settings.DisableTargetInfo || timestamp == 0 { +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earliestTimestamp, latestTimestamp time.Time, converter *PrometheusConverter) { + if settings.DisableTargetInfo { return } @@ -628,12 +640,27 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta return } - sample := &prompb.Sample{ - Value: float64(1), - // convert ns to ms - Timestamp: convertTimeStamp(timestamp), + // Generate target_info samples starting at earliestTimestamp and ending at latestTimestamp, + // with a sample at every interval between them. + // Use an interval corresponding to half of the lookback delta, to ensure that target_info samples are found + // for the entirety of the relevant period. + if settings.LookbackDelta == 0 { + settings.LookbackDelta = defaultLookbackDelta + } + interval := settings.LookbackDelta / 2 + ts, _ := converter.getOrCreateTimeSeries(labels) + for timestamp := earliestTimestamp; timestamp.Before(latestTimestamp); timestamp = timestamp.Add(interval) { + ts.Samples = append(ts.Samples, prompb.Sample{ + Value: float64(1), + Timestamp: timestamp.UnixMilli(), + }) + } + if len(ts.Samples) == 0 || ts.Samples[len(ts.Samples)-1].Timestamp < latestTimestamp.UnixMilli() { + ts.Samples = append(ts.Samples, prompb.Sample{ + Value: float64(1), + Timestamp: latestTimestamp.UnixMilli(), + }) } - converter.addSample(sample, labels) } // convertTimeStamp converts OTLP timestamp in ns to timestamp in ms. diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 8d5c73e37c..07837ef374 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -20,7 +20,9 @@ import ( "context" "errors" "fmt" + "math" "sort" + "time" "github.com/prometheus/otlptranslator" "go.opentelemetry.io/collector/pdata/pcommon" @@ -50,6 +52,8 @@ type Settings struct { AllowDeltaTemporality bool // PromoteScopeMetadata controls whether to promote OTel scope metadata to metric labels. PromoteScopeMetadata bool + // LookbackDelta is the PromQL engine lookback delta. + LookbackDelta time.Duration } // PrometheusConverter converts from OTel write format to Prometheus remote write format. @@ -132,9 +136,10 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric resourceMetrics := resourceMetricsSlice.At(i) resource := resourceMetrics.Resource() scopeMetricsSlice := resourceMetrics.ScopeMetrics() - // keep track of the most recent timestamp in the ResourceMetrics for + // keep track of the earliest and latest timestamp in the ResourceMetrics for // use with the "target" info metric - var mostRecentTimestamp pcommon.Timestamp + earliestTimestamp := pcommon.Timestamp(math.MaxUint64) + latestTimestamp := pcommon.Timestamp(0) for j := 0; j < scopeMetricsSlice.Len(); j++ { scopeMetrics := scopeMetricsSlice.At(j) scope := newScopeFromScopeMetrics(scopeMetrics) @@ -148,7 +153,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } metric := metricSlice.At(k) - mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) + earliestTimestamp, latestTimestamp = findMinAndMaxTimestamps(metric, earliestTimestamp, latestTimestamp) temporality, hasTemporality, err := aggregationTemporality(metric) if err != nil { errs = multierr.Append(errs, err) @@ -264,7 +269,11 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } } } - addResourceTargetInfo(resource, settings, mostRecentTimestamp, c) + if earliestTimestamp < pcommon.Timestamp(math.MaxUint64) { + // We have at least one metric sample for this resource. + // Generate a corresponding target_info series. + addResourceTargetInfo(resource, settings, earliestTimestamp.AsTime(), latestTimestamp.AsTime(), c) + } } return annots, errs diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index 63ac6b0b0b..057dc63a7d 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -275,6 +275,100 @@ func TestFromMetrics(t *testing.T) { "histogram data point has zero count, but non-zero sum: 155.000000", }, ws) }) + + t.Run("target_info's samples starts at the earliest metric sample timestamp and ends at the latest sample timestamp of the corresponding resource, with one sample every lookback delta/2 timestamps between", func(t *testing.T) { + request := pmetricotlp.NewExportRequest() + rm := request.Metrics().ResourceMetrics().AppendEmpty() + generateAttributes(rm.Resource().Attributes(), "resource", 5) + + // Fake some resource attributes. + for k, v := range map[string]string{ + "service.name": "test-service", + "service.namespace": "test-namespace", + "service.instance.id": "id1234", + } { + rm.Resource().Attributes().PutStr(k, v) + } + metrics := rm.ScopeMetrics().AppendEmpty().Metrics() + ts := pcommon.NewTimestampFromTime(time.Now()) + + var expMetadata []prompb.MetricMetadata + for i := range 3 { + m := metrics.AppendEmpty() + m.SetEmptyGauge() + m.SetName(fmt.Sprintf("gauge-%v", i+1)) + m.SetDescription("gauge") + m.SetUnit("unit") + // Add samples every lookback delta / 4 timestamps. + curTs := ts.AsTime() + for range 6 { + point := m.Gauge().DataPoints().AppendEmpty() + point.SetTimestamp(pcommon.NewTimestampFromTime(curTs)) + point.SetDoubleValue(1.23) + generateAttributes(point.Attributes(), "series", 2) + curTs = curTs.Add(defaultLookbackDelta / 4) + } + + namer := otlptranslator.MetricNamer{} + expMetadata = append(expMetadata, prompb.MetricMetadata{ + Type: otelMetricTypeToPromMetricType(m), + MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(m)), + Help: m.Description(), + Unit: m.Unit(), + }) + } + + converter := NewPrometheusConverter() + annots, err := converter.FromMetrics( + context.Background(), + request.Metrics(), + Settings{ + LookbackDelta: defaultLookbackDelta, + }, + ) + require.NoError(t, err) + require.Empty(t, annots) + + testutil.RequireEqual(t, expMetadata, converter.Metadata()) + + timeSeries := converter.TimeSeries() + tgtInfoCount := 0 + for _, s := range timeSeries { + b := labels.NewScratchBuilder(2) + lbls := s.ToLabels(&b, nil) + if lbls.Get(labels.MetricName) != "target_info" { + continue + } + + tgtInfoCount++ + require.Equal(t, "test-namespace/test-service", lbls.Get("job")) + require.Equal(t, "id1234", lbls.Get("instance")) + require.False(t, lbls.Has("service_name")) + require.False(t, lbls.Has("service_namespace")) + require.False(t, lbls.Has("service_instance_id")) + // There should be a target_info sample at the earliest metric timestamp, then two spaced lookback delta/2 apart, + // then one at the latest metric timestamp. + testutil.RequireEqual(t, []prompb.Sample{ + { + Value: 1, + Timestamp: ts.AsTime().UnixMilli(), + }, + { + Value: 1, + Timestamp: ts.AsTime().Add(defaultLookbackDelta / 2).UnixMilli(), + }, + { + Value: 1, + Timestamp: ts.AsTime().Add(defaultLookbackDelta).UnixMilli(), + }, + { + Value: 1, + Timestamp: ts.AsTime().Add(defaultLookbackDelta + defaultLookbackDelta/4).UnixMilli(), + }, + }, s.Samples) + } + require.Equal(t, 1, tgtInfoCount) + }) } func TestTemporality(t *testing.T) { diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index fa760f0b49..70c845f4aa 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -530,6 +530,9 @@ type OTLPOptions struct { // marking the metric type as unknown for now). // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) NativeDelta bool + // LookbackDelta is the query lookback delta. + // Used to calculate the target_info sample timestamp interval. + LookbackDelta time.Duration } // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and @@ -547,6 +550,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl }, config: configFunc, allowDeltaTemporality: opts.NativeDelta, + lookbackDelta: opts.LookbackDelta, } wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex} @@ -583,6 +587,7 @@ type rwExporter struct { *writeHandler config func() config.Config allowDeltaTemporality bool + lookbackDelta time.Duration } func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -597,6 +602,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB, AllowDeltaTemporality: rw.allowDeltaTemporality, PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata, + LookbackDelta: rw.lookbackDelta, }) if err != nil { rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index c924c9092c..5002fad27e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -264,6 +264,7 @@ func NewAPI( acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, otlpEnabled, otlpDeltaToCumulative, otlpNativeDeltaIngestion bool, ctZeroIngestionEnabled bool, + lookbackDelta time.Duration, ) *API { a := &API{ QueryEngine: qe, @@ -310,7 +311,11 @@ func NewAPI( a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative, NativeDelta: otlpNativeDeltaIngestion}) + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ + ConvertDelta: otlpDeltaToCumulative, + NativeDelta: otlpNativeDeltaIngestion, + LookbackDelta: lookbackDelta, + }) } return a diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index bb70792583..b3e95e2243 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -145,6 +145,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route false, false, false, + 5*time.Minute, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 601d42cbea..7280255f8b 100644 --- a/web/web.go +++ b/web/web.go @@ -392,6 +392,7 @@ func New(logger *slog.Logger, o *Options) *Handler { o.ConvertOTLPDelta, o.NativeOTLPDeltaIngestion, o.CTZeroIngestionEnabled, + o.LookbackDelta, ) if o.RoutePrefix != "/" {