From c36e966bf84ac651711e9b9dbdad9668863f8475 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 28 Oct 2025 15:13:43 +0100 Subject: [PATCH] OTLP: de-duplicate target_info samples with conflicting timestamps (#17400) Add logic to the target_info metric generation in the OTLP endpoint, so that any samples with the same timestamp for the same (target_info) series are de-duplicated. It comes out of a user's bug report about duplicated target_info samples in Grafana Mimir (which uses the Prometheus target_info generation logic). If I'm not mistaken, duplicate target_info samples should stem from multiple resources in the same OTLP request being translated to the same target_info label set. It shouldn't be caused by a Prometheus bug. --- .../prometheusremotewrite/helper.go | 33 ++++++- .../prometheusremotewrite/metrics_to_prw.go | 9 ++ .../metrics_to_prw_test.go | 93 +++++++++++++++++++ 3 files changed, 133 insertions(+), 2 deletions(-) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index a27447a90f..aa54433836 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -562,12 +562,41 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s settings.LookbackDelta = defaultLookbackDelta } interval := settings.LookbackDelta / 2 + + // Deduplicate target_info samples with the same labelset and timestamp across + // multiple resources in the same batch. + labelsHash := lbls.Hash() + + var key targetInfoKey for timestamp := earliestTimestamp; timestamp.Before(latestTimestamp); timestamp = timestamp.Add(interval) { - if err := c.appender.AppendSample(lbls, meta, 0, timestamp.UnixMilli(), float64(1), nil); err != nil { + timestampMs := timestamp.UnixMilli() + key = targetInfoKey{ + labelsHash: labelsHash, + timestamp: timestampMs, + } + if _, exists := c.seenTargetInfo[key]; exists { + // Skip duplicate. + continue + } + + c.seenTargetInfo[key] = struct{}{} + if err := c.appender.AppendSample(lbls, meta, 0, timestampMs, float64(1), nil); err != nil { return err } } - return c.appender.AppendSample(lbls, meta, 0, latestTimestamp.UnixMilli(), float64(1), nil) + + // Append the final sample at latestTimestamp. + finalTimestampMs := latestTimestamp.UnixMilli() + key = targetInfoKey{ + labelsHash: labelsHash, + timestamp: finalTimestampMs, + } + if _, exists := c.seenTargetInfo[key]; exists { + return nil + } + + c.seenTargetInfo[key] = struct{}{} + return c.appender.AppendSample(lbls, meta, 0, finalTimestampMs, float64(1), nil) } // convertTimeStamp converts OTLP timestamp in ns to timestamp in ms. diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index eb540fc611..f0e623a72c 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -68,6 +68,14 @@ type PrometheusConverter struct { scratchBuilder labels.ScratchBuilder builder *labels.Builder appender CombinedAppender + // seenTargetInfo tracks target_info samples within a batch to prevent duplicates. + seenTargetInfo map[targetInfoKey]struct{} +} + +// targetInfoKey uniquely identifies a target_info sample by its labelset and timestamp. +type targetInfoKey struct { + labelsHash uint64 + timestamp int64 } func NewPrometheusConverter(appender CombinedAppender) *PrometheusConverter { @@ -129,6 +137,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } unitNamer := otlptranslator.UnitNamer{} c.everyN = everyNTimes{n: 128} + c.seenTargetInfo = make(map[targetInfoKey]struct{}) resourceMetricsSlice := md.ResourceMetrics() numMetrics := 0 diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index 6fd2c1e05e..e34a4c824c 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -363,6 +363,99 @@ func TestFromMetrics(t *testing.T) { }, }, mockAppender.samples[len(mockAppender.samples)-4:]) }) + + t.Run("target_info deduplication across multiple resources with same labels", func(t *testing.T) { + request := pmetricotlp.NewExportRequest() + ts := pcommon.NewTimestampFromTime(time.Now()) + + // Create two ResourceMetrics with identical resource attributes. + // Without deduplication, each would generate its own target_info samples, + // resulting in duplicates. + for range 2 { + rm := request.Metrics().ResourceMetrics().AppendEmpty() + generateAttributes(rm.Resource().Attributes(), "resource", 3) + + // Fake some resource attributes. + for k, v := range map[string]string{ + "service.name": "test-service", + "service.namespace": "test-namespace", + "service.instance.id": "id1234", + } { + rm.Resource().Attributes().PutStr(k, v) + } + metrics := rm.ScopeMetrics().AppendEmpty().Metrics() + + // Add metrics. + m := metrics.AppendEmpty() + m.SetEmptyGauge() + m.SetName("gauge-1") + m.SetDescription("gauge") + m.SetUnit("unit") + + point1 := m.Gauge().DataPoints().AppendEmpty() + point1.SetTimestamp(ts) + point1.SetDoubleValue(1.23) + generateAttributes(point1.Attributes(), "series", 1) + + point2 := m.Gauge().DataPoints().AppendEmpty() + point2.SetTimestamp(pcommon.NewTimestampFromTime(ts.AsTime().Add(defaultLookbackDelta / 2))) + point2.SetDoubleValue(2.34) + generateAttributes(point2.Attributes(), "series", 1) + } + + mockAppender := &mockCombinedAppender{} + converter := NewPrometheusConverter(mockAppender) + annots, err := converter.FromMetrics( + context.Background(), + request.Metrics(), + Settings{ + LookbackDelta: defaultLookbackDelta, + }, + ) + require.NoError(t, err) + require.Empty(t, annots) + require.NoError(t, mockAppender.Commit()) + + var targetInfoSamples []combinedSample + for _, s := range mockAppender.samples { + if s.ls.Get(labels.MetricName) == "target_info" { + targetInfoSamples = append(targetInfoSamples, s) + } + } + + // Should have exactly 2 target_info samples (at ts and ts + lookbackDelta/2), + // not 4 (which would happen if both resources generated their own target_info samples). + require.Len(t, targetInfoSamples, 2) + + targetInfoLabels := labels.FromStrings( + "__name__", "target_info", + "instance", "id1234", + "job", "test-namespace/test-service", + "resource_name_1", "value-1", + "resource_name_2", "value-2", + "resource_name_3", "value-3", + ) + targetInfoMeta := metadata.Metadata{ + Type: model.MetricTypeGauge, + Help: "Target metadata", + } + requireEqual(t, []combinedSample{ + { + metricFamilyName: "target_info", + v: 1, + t: ts.AsTime().UnixMilli(), + ls: targetInfoLabels, + meta: targetInfoMeta, + }, + { + metricFamilyName: "target_info", + v: 1, + t: ts.AsTime().Add(defaultLookbackDelta / 2).UnixMilli(), + ls: targetInfoLabels, + meta: targetInfoMeta, + }, + }, targetInfoSamples) + }) } func TestTemporality(t *testing.T) {