diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 07a2809348..8d5c73e37c 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -77,7 +77,7 @@ func TranslatorMetricFromOtelMetric(metric pmetric.Metric) otlptranslator.Metric case pmetric.MetricTypeGauge: m.Type = otlptranslator.MetricTypeGauge case pmetric.MetricTypeSum: - if metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative { + if metric.Sum().IsMonotonic() { m.Type = otlptranslator.MetricTypeMonotonicCounter } else { m.Type = otlptranslator.MetricTypeNonMonotonicCounter diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index 776f0161d1..63ac6b0b0b 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -23,7 +23,6 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" "github.com/prometheus/otlptranslator" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" @@ -36,67 +35,105 @@ import ( ) func TestFromMetrics(t *testing.T) { - for _, keepIdentifyingResourceAttributes := range []bool{false, true} { - t.Run(fmt.Sprintf("successful/keepIdentifyingAttributes=%v", keepIdentifyingResourceAttributes), func(t *testing.T) { - converter := NewPrometheusConverter() - payload := createExportRequest(5, 128, 128, 2, 0) - var expMetadata []prompb.MetricMetadata - resourceMetricsSlice := payload.Metrics().ResourceMetrics() - for i := 0; i < resourceMetricsSlice.Len(); i++ { - scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics() - for j := 0; j < scopeMetricsSlice.Len(); j++ { - metricSlice := scopeMetricsSlice.At(j).Metrics() - for k := 0; k < metricSlice.Len(); k++ { - metric := metricSlice.At(k) - namer := otlptranslator.MetricNamer{} - promName := namer.Build(TranslatorMetricFromOtelMetric(metric)) - expMetadata = append(expMetadata, prompb.MetricMetadata{ - Type: otelMetricTypeToPromMetricType(metric), - MetricFamilyName: promName, - Help: metric.Description(), - Unit: metric.Unit(), - }) + t.Run("Successful", func(t *testing.T) { + for _, tc := range []struct { + name string + settings Settings + temporality pmetric.AggregationTemporality + }{ + { + name: "Default with cumulative temporality", + settings: Settings{}, + temporality: pmetric.AggregationTemporalityCumulative, + }, + { + name: "Default with delta temporality", + settings: Settings{ + AllowDeltaTemporality: true, + }, + temporality: pmetric.AggregationTemporalityDelta, + }, + { + name: "Keep identifying attributes", + settings: Settings{ + KeepIdentifyingResourceAttributes: true, + }, + temporality: pmetric.AggregationTemporalityCumulative, + }, + { + name: "Add metric suffixes with cumulative temporality", + settings: Settings{ + AddMetricSuffixes: true, + }, + temporality: pmetric.AggregationTemporalityCumulative, + }, + { + name: "Add metric suffixes with delta temporality", + settings: Settings{ + AddMetricSuffixes: true, + AllowDeltaTemporality: true, + }, + temporality: pmetric.AggregationTemporalityDelta, + }, + } { + t.Run(tc.name, func(t *testing.T) { + converter := NewPrometheusConverter() + payload, wantPromMetrics := createExportRequest(5, 128, 128, 2, 0, tc.settings, tc.temporality) + var expMetadata []prompb.MetricMetadata + seenFamilyNames := map[string]struct{}{} + for _, wantMetric := range wantPromMetrics { + if _, exists := seenFamilyNames[wantMetric.familyName]; exists { + continue + } + if wantMetric.familyName == "target_info" { + continue + } + + seenFamilyNames[wantMetric.familyName] = struct{}{} + expMetadata = append(expMetadata, prompb.MetricMetadata{ + Type: wantMetric.metricType, + MetricFamilyName: wantMetric.familyName, + Help: wantMetric.description, + Unit: wantMetric.unit, + }) + } + + annots, err := converter.FromMetrics( + context.Background(), + payload.Metrics(), + tc.settings, + ) + require.NoError(t, err) + require.Empty(t, annots) + + testutil.RequireEqual(t, expMetadata, converter.Metadata()) + + ts := converter.TimeSeries() + require.Len(t, ts, 1536+1) // +1 for the target_info. + + tgtInfoCount := 0 + for _, s := range ts { + b := labels.NewScratchBuilder(2) + lbls := s.ToLabels(&b, nil) + if lbls.Get(labels.MetricName) == "target_info" { + tgtInfoCount++ + require.Equal(t, "test-namespace/test-service", lbls.Get("job")) + require.Equal(t, "id1234", lbls.Get("instance")) + if tc.settings.KeepIdentifyingResourceAttributes { + require.Equal(t, "test-service", lbls.Get("service_name")) + require.Equal(t, "test-namespace", lbls.Get("service_namespace")) + require.Equal(t, "id1234", lbls.Get("service_instance_id")) + } else { + require.False(t, lbls.Has("service_name")) + require.False(t, lbls.Has("service_namespace")) + require.False(t, lbls.Has("service_instance_id")) + } } } - } - - annots, err := converter.FromMetrics( - context.Background(), - payload.Metrics(), - Settings{KeepIdentifyingResourceAttributes: keepIdentifyingResourceAttributes}, - ) - require.NoError(t, err) - require.Empty(t, annots) - - if diff := cmp.Diff(expMetadata, converter.Metadata()); diff != "" { - t.Errorf("mismatch (-want +got):\n%s", diff) - } - - ts := converter.TimeSeries() - require.Len(t, ts, 1408+1) // +1 for the target_info. - - tgtInfoCount := 0 - for _, s := range ts { - b := labels.NewScratchBuilder(2) - lbls := s.ToLabels(&b, nil) - if lbls.Get(labels.MetricName) == "target_info" { - tgtInfoCount++ - require.Equal(t, "test-namespace/test-service", lbls.Get("job")) - require.Equal(t, "id1234", lbls.Get("instance")) - if keepIdentifyingResourceAttributes { - require.Equal(t, "test-service", lbls.Get("service_name")) - require.Equal(t, "test-namespace", lbls.Get("service_namespace")) - require.Equal(t, "id1234", lbls.Get("service_instance_id")) - } else { - require.False(t, lbls.Has("service_name")) - require.False(t, lbls.Has("service_namespace")) - require.False(t, lbls.Has("service_instance_id")) - } - } - } - require.Equal(t, 1, tgtInfoCount) - }) - } + require.Equal(t, 1, tgtInfoCount) + }) + } + }) for _, convertHistogramsToNHCB := range []bool{false, true} { t.Run(fmt.Sprintf("successful/convertHistogramsToNHCB=%v", convertHistogramsToNHCB), func(t *testing.T) { @@ -144,25 +181,27 @@ func TestFromMetrics(t *testing.T) { } t.Run("context cancellation", func(t *testing.T) { + settings := Settings{} converter := NewPrometheusConverter() ctx, cancel := context.WithCancel(context.Background()) // Verify that converter.FromMetrics respects cancellation. cancel() - payload := createExportRequest(5, 128, 128, 2, 0) + payload, _ := createExportRequest(5, 128, 128, 2, 0, settings, pmetric.AggregationTemporalityCumulative) - annots, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}) + annots, err := converter.FromMetrics(ctx, payload.Metrics(), settings) require.ErrorIs(t, err, context.Canceled) require.Empty(t, annots) }) t.Run("context timeout", func(t *testing.T) { + settings := Settings{} converter := NewPrometheusConverter() // Verify that converter.FromMetrics respects timeout. ctx, cancel := context.WithTimeout(context.Background(), 0) t.Cleanup(cancel) - payload := createExportRequest(5, 128, 128, 2, 0) + payload, _ := createExportRequest(5, 128, 128, 2, 0, settings, pmetric.AggregationTemporalityCumulative) - annots, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}) + annots, err := converter.FromMetrics(ctx, payload.Metrics(), settings) require.ErrorIs(t, err, context.DeadlineExceeded) require.Empty(t, annots) }) @@ -693,6 +732,139 @@ func sortTimeSeries(series []prompb.TimeSeries) []prompb.TimeSeries { return series } +func TestTranslatorMetricFromOtelMetric(t *testing.T) { + tests := []struct { + name string + inputMetric pmetric.Metric + expectedMetric otlptranslator.Metric + }{ + { + name: "gauge metric", + inputMetric: createOTelGaugeForTranslator("test_gauge", "bytes", "Test gauge metric"), + expectedMetric: otlptranslator.Metric{ + Name: "test_gauge", + Unit: "bytes", + Type: otlptranslator.MetricTypeGauge, + }, + }, + { + name: "monotonic sum metric", + inputMetric: createOTelSumForTranslator("test_sum", "count", "Test sum metric", true), + expectedMetric: otlptranslator.Metric{ + Name: "test_sum", + Unit: "count", + Type: otlptranslator.MetricTypeMonotonicCounter, + }, + }, + { + name: "non-monotonic sum metric", + inputMetric: createOTelSumForTranslator("test_sum", "count", "Test sum metric", false), + expectedMetric: otlptranslator.Metric{ + Name: "test_sum", + Unit: "count", + Type: otlptranslator.MetricTypeNonMonotonicCounter, + }, + }, + { + name: "histogram metric", + inputMetric: createOTelHistogramForTranslator("test_histogram", "seconds", "Test histogram metric"), + expectedMetric: otlptranslator.Metric{ + Name: "test_histogram", + Unit: "seconds", + Type: otlptranslator.MetricTypeHistogram, + }, + }, + { + name: "exponential histogram metric", + inputMetric: createOTelExponentialHistogramForTranslator("test_exp_histogram", "milliseconds", "Test exponential histogram metric"), + expectedMetric: otlptranslator.Metric{ + Name: "test_exp_histogram", + Unit: "milliseconds", + Type: otlptranslator.MetricTypeExponentialHistogram, + }, + }, + { + name: "summary metric", + inputMetric: createOTelSummaryForTranslator("test_summary", "duration", "Test summary metric"), + expectedMetric: otlptranslator.Metric{ + Name: "test_summary", + Unit: "duration", + Type: otlptranslator.MetricTypeSummary, + }, + }, + { + name: "empty metric name and unit", + inputMetric: createOTelGaugeForTranslator("", "", ""), + expectedMetric: otlptranslator.Metric{ + Name: "", + Unit: "", + Type: otlptranslator.MetricTypeGauge, + }, + }, + { + name: "empty metric type defaults to unknown", + inputMetric: createOTelEmptyMetricForTranslator("test_empty"), + expectedMetric: otlptranslator.Metric{ + Name: "test_empty", + Unit: "", + Type: otlptranslator.MetricTypeUnknown, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := TranslatorMetricFromOtelMetric(tc.inputMetric) + require.Equal(t, tc.expectedMetric, result) + }) + } +} + +func createOTelMetricForTranslator(name, unit, description string) pmetric.Metric { + m := pmetric.NewMetric() + m.SetName(name) + m.SetUnit(unit) + m.SetDescription(description) + return m +} + +func createOTelGaugeForTranslator(name, unit, description string) pmetric.Metric { + m := createOTelMetricForTranslator(name, unit, description) + m.SetEmptyGauge() + return m +} + +func createOTelSumForTranslator(name, unit, description string, isMonotonic bool) pmetric.Metric { + m := createOTelMetricForTranslator(name, unit, description) + sum := m.SetEmptySum() + sum.SetIsMonotonic(isMonotonic) + return m +} + +func createOTelHistogramForTranslator(name, unit, description string) pmetric.Metric { + m := createOTelMetricForTranslator(name, unit, description) + m.SetEmptyHistogram() + return m +} + +func createOTelExponentialHistogramForTranslator(name, unit, description string) pmetric.Metric { + m := createOTelMetricForTranslator(name, unit, description) + m.SetEmptyExponentialHistogram() + return m +} + +func createOTelSummaryForTranslator(name, unit, description string) pmetric.Metric { + m := createOTelMetricForTranslator(name, unit, description) + m.SetEmptySummary() + return m +} + +func createOTelEmptyMetricForTranslator(name string) pmetric.Metric { + m := pmetric.NewMetric() + m.SetName(name) + return m +} + func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for _, resourceAttributeCount := range []int{0, 5, 50} { b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { @@ -711,12 +883,21 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) { for _, exemplarsPerSeries := range []int{0, 5, 10} { b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { - payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) + settings := Settings{} + payload, _ := createExportRequest( + resourceAttributeCount, + histogramCount, + nonHistogramCount, + labelsPerMetric, + exemplarsPerSeries, + settings, + pmetric.AggregationTemporalityCumulative, + ) b.ResetTimer() for range b.N { converter := NewPrometheusConverter() - annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{}) + annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) require.NoError(b, err) require.Empty(b, annots) require.NotNil(b, converter.TimeSeries()) @@ -734,7 +915,15 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { } } -func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries int) pmetricotlp.ExportRequest { +type wantPrometheusMetric struct { + name string + familyName string + metricType prompb.MetricMetadata_MetricType + description string + unit string +} + +func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries int, settings Settings, temporality pmetric.AggregationTemporality) (pmetricotlp.ExportRequest, []wantPrometheusMetric) { request := pmetricotlp.NewExportRequest() rm := request.Metrics().ResourceMetrics().AppendEmpty() @@ -752,13 +941,18 @@ func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCou metrics := rm.ScopeMetrics().AppendEmpty().Metrics() ts := pcommon.NewTimestampFromTime(time.Now()) + var suffix string + if settings.AddMetricSuffixes { + suffix = "_unit" + } + var wantPromMetrics []wantPrometheusMetric for i := 1; i <= histogramCount; i++ { m := metrics.AppendEmpty() m.SetEmptyHistogram() m.SetName(fmt.Sprintf("histogram-%v", i)) m.SetDescription("histogram") m.SetUnit("unit") - m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + m.Histogram().SetAggregationTemporality(temporality) h := m.Histogram().DataPoints().AppendEmpty() h.SetTimestamp(ts) @@ -770,20 +964,96 @@ func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCou generateAttributes(h.Attributes(), "series", labelsPerMetric) generateExemplars(h.Exemplars(), exemplarsPerSeries, ts) + + metricType := prompb.MetricMetadata_HISTOGRAM + if temporality != pmetric.AggregationTemporalityCumulative { + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + metricType = prompb.MetricMetadata_UNKNOWN + } + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: fmt.Sprintf("histogram_%d%s_bucket", i, suffix), + familyName: fmt.Sprintf("histogram_%d%s", i, suffix), + metricType: metricType, + unit: "unit", + description: "histogram", + }) + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: fmt.Sprintf("histogram_%d%s_count", i, suffix), + familyName: fmt.Sprintf("histogram_%d%s", i, suffix), + metricType: metricType, + unit: "unit", + description: "histogram", + }) + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: fmt.Sprintf("histogram_%d%s_sum", i, suffix), + familyName: fmt.Sprintf("histogram_%d%s", i, suffix), + metricType: metricType, + unit: "unit", + description: "histogram", + }) } for i := 1; i <= nonHistogramCount; i++ { m := metrics.AppendEmpty() m.SetEmptySum() - m.SetName(fmt.Sprintf("sum-%v", i)) + m.SetName(fmt.Sprintf("non.monotonic.sum-%v", i)) m.SetDescription("sum") m.SetUnit("unit") - m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + m.Sum().SetAggregationTemporality(temporality) point := m.Sum().DataPoints().AppendEmpty() point.SetTimestamp(ts) point.SetDoubleValue(1.23) generateAttributes(point.Attributes(), "series", labelsPerMetric) generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + + metricType := prompb.MetricMetadata_GAUGE + if temporality != pmetric.AggregationTemporalityCumulative { + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + metricType = prompb.MetricMetadata_UNKNOWN + } + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: fmt.Sprintf("non_monotonic_sum_%d%s", i, suffix), + familyName: fmt.Sprintf("non_monotonic_sum_%d%s", i, suffix), + metricType: metricType, + unit: "unit", + description: "sum", + }) + } + + for i := 1; i <= nonHistogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptySum() + m.SetName(fmt.Sprintf("monotonic.sum-%v", i)) + m.SetDescription("sum") + m.SetUnit("unit") + m.Sum().SetAggregationTemporality(temporality) + m.Sum().SetIsMonotonic(true) + point := m.Sum().DataPoints().AppendEmpty() + point.SetTimestamp(ts) + point.SetDoubleValue(1.23) + generateAttributes(point.Attributes(), "series", labelsPerMetric) + generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + + var counterSuffix string + if settings.AddMetricSuffixes { + counterSuffix = suffix + "_total" + } + + metricType := prompb.MetricMetadata_COUNTER + if temporality != pmetric.AggregationTemporalityCumulative { + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + metricType = prompb.MetricMetadata_UNKNOWN + } + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: fmt.Sprintf("monotonic_sum_%d%s", i, counterSuffix), + familyName: fmt.Sprintf("monotonic_sum_%d%s", i, counterSuffix), + metricType: metricType, + unit: "unit", + description: "sum", + }) } for i := 1; i <= nonHistogramCount; i++ { @@ -797,9 +1067,21 @@ func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCou point.SetDoubleValue(1.23) generateAttributes(point.Attributes(), "series", labelsPerMetric) generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: fmt.Sprintf("gauge_%d%s", i, suffix), + familyName: fmt.Sprintf("gauge_%d%s", i, suffix), + metricType: prompb.MetricMetadata_GAUGE, + unit: "unit", + description: "gauge", + }) } - return request + wantPromMetrics = append(wantPromMetrics, wantPrometheusMetric{ + name: "target_info", + familyName: "target_info", + }) + return request, wantPromMetrics } func generateAttributes(m pcommon.Map, prefix string, count int) {