diff --git a/CHANGELOG.md b/CHANGELOG.md index 23d2c89da8..5cc5b8f406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] TSDB: Fix the predicate checking for blocks which are beyond the retention period to include the ones right at the retention boundary. #9633 * [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974 +* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991 ## 2.51.2 / 2024-04-09 diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index de228e807a..68be82e443 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -219,7 +219,7 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { return false } -func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, +func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, resource pcommon.Resource, settings Settings, baseName string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) @@ -395,7 +395,7 @@ func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { return ts } -func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, +func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, settings Settings, baseName string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) @@ -468,7 +468,7 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr // getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. // Otherwise it creates a new one and returns that, and true. -func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { +func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { h := timeSeriesSignature(lbls) ts := c.unique[h] if ts != nil { @@ -504,7 +504,7 @@ func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp // addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist. // If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp, // both converted to milliseconds. -func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) { +func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) { ts, created := c.getOrCreateTimeSeries(lbls) if created { ts.Samples = []prompb.Sample{ @@ -518,7 +518,7 @@ func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi } // addResourceTargetInfo converts the resource to the target info metric. -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) { +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) { if settings.DisableTargetInfo || timestamp == 0 { return } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index 45f1df123e..31d343fe4d 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -30,7 +30,7 @@ import ( const defaultZeroThreshold = 1e-128 -func (c *prometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, +func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, resource pcommon.Resource, settings Settings, baseName string) error { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index a108306ba3..2d6aa30a13 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "sort" - "strconv" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -39,34 +38,21 @@ type Settings struct { SendMetadata bool } -// FromMetrics converts pmetric.Metrics to Prometheus remote write format. -func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { - c := newPrometheusConverter() - errs := c.fromMetrics(md, settings) - tss := c.timeSeries() - out := make(map[string]*prompb.TimeSeries, len(tss)) - for i := range tss { - out[strconv.Itoa(i)] = &tss[i] - } - - return out, errs -} - -// prometheusConverter converts from OTel write format to Prometheus write format. -type prometheusConverter struct { +// PrometheusConverter converts from OTel write format to Prometheus write format. +type PrometheusConverter struct { unique map[uint64]*prompb.TimeSeries conflicts map[uint64][]*prompb.TimeSeries } -func newPrometheusConverter() *prometheusConverter { - return &prometheusConverter{ +func NewPrometheusConverter() *PrometheusConverter { + return &PrometheusConverter{ unique: map[uint64]*prompb.TimeSeries{}, conflicts: map[uint64][]*prompb.TimeSeries{}, } } -// fromMetrics converts pmetric.Metrics to Prometheus remote write format. -func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) { +// FromMetrics converts pmetric.Metrics to Prometheus remote write format. +func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (errs error) { resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) @@ -144,8 +130,8 @@ func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) return } -// timeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. -func (c *prometheusConverter) timeSeries() []prompb.TimeSeries { +// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. +func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries { conflicts := 0 for _, ts := range c.conflicts { conflicts += len(ts) @@ -177,7 +163,7 @@ func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { // addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, // the exemplar is added to the bucket bound's time series, provided that the time series' has samples. -func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { +func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { if len(bucketBounds) == 0 { return } @@ -202,7 +188,7 @@ func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, // If there is no corresponding TimeSeries already, it's created. // The corresponding TimeSeries is returned. // If either lbls is nil/empty or sample is nil, nothing is done. -func (c *prometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { +func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { if sample == nil || len(lbls) == 0 { // This shouldn't happen return nil diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go new file mode 100644 index 0000000000..4797daeece --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -0,0 +1,134 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package prometheusremotewrite + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { + for _, resourceAttributeCount := range []int{0, 5, 50} { + b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { + for _, histogramCount := range []int{0, 1000} { + b.Run(fmt.Sprintf("histogram count: %v", histogramCount), func(b *testing.B) { + nonHistogramCounts := []int{0, 1000} + + if resourceAttributeCount == 0 && histogramCount == 0 { + // Don't bother running a scenario where we'll generate no series. + nonHistogramCounts = []int{1000} + } + + for _, nonHistogramCount := range nonHistogramCounts { + b.Run(fmt.Sprintf("non-histogram count: %v", nonHistogramCount), func(b *testing.B) { + for _, labelsPerMetric := range []int{2, 20} { + b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) { + for _, exemplarsPerSeries := range []int{0, 5, 10} { + b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { + payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) + + for i := 0; i < b.N; i++ { + converter := NewPrometheusConverter() + require.NoError(b, converter.FromMetrics(payload.Metrics(), Settings{})) + require.NotNil(b, converter.TimeSeries()) + } + }) + } + }) + } + }) + } + }) + } + }) + } +} + +func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest { + request := pmetricotlp.NewExportRequest() + + rm := request.Metrics().ResourceMetrics().AppendEmpty() + generateAttributes(rm.Resource().Attributes(), "resource", resourceAttributeCount) + + metrics := rm.ScopeMetrics().AppendEmpty().Metrics() + ts := pcommon.NewTimestampFromTime(time.Now()) + + for i := 1; i <= histogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptyHistogram() + m.SetName(fmt.Sprintf("histogram-%v", i)) + m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + h := m.Histogram().DataPoints().AppendEmpty() + h.SetTimestamp(ts) + + // Set 50 samples, 10 each with values 0.5, 1, 2, 4, and 8 + h.SetCount(50) + h.SetSum(155) + h.BucketCounts().FromRaw([]uint64{10, 10, 10, 10, 10, 0}) + h.ExplicitBounds().FromRaw([]float64{.5, 1, 2, 4, 8, 16}) // Bucket boundaries include the upper limit (ie. each sample is on the upper limit of its bucket) + + generateAttributes(h.Attributes(), "series", labelsPerMetric) + generateExemplars(h.Exemplars(), exemplarsPerSeries, ts) + } + + for i := 1; i <= nonHistogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptySum() + m.SetName(fmt.Sprintf("sum-%v", i)) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + point := m.Sum().DataPoints().AppendEmpty() + point.SetTimestamp(ts) + point.SetDoubleValue(1.23) + generateAttributes(point.Attributes(), "series", labelsPerMetric) + generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + } + + for i := 1; i <= nonHistogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptyGauge() + m.SetName(fmt.Sprintf("gauge-%v", i)) + point := m.Gauge().DataPoints().AppendEmpty() + point.SetTimestamp(ts) + point.SetDoubleValue(1.23) + generateAttributes(point.Attributes(), "series", labelsPerMetric) + generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + } + + return request +} + +func generateAttributes(m pcommon.Map, prefix string, count int) { + for i := 1; i <= count; i++ { + m.PutStr(fmt.Sprintf("%v-name-%v", prefix, i), fmt.Sprintf("value-%v", i)) + } +} + +func generateExemplars(exemplars pmetric.ExemplarSlice, count int, ts pcommon.Timestamp) { + for i := 1; i <= count; i++ { + e := exemplars.AppendEmpty() + e.SetTimestamp(ts) + e.SetDoubleValue(2.22) + e.SetSpanID(pcommon.SpanID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}) + e.SetTraceID(pcommon.TraceID{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}) + } +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index 75c3d98459..aafebc6c46 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -27,7 +27,7 @@ import ( "github.com/prometheus/prometheus/prompb" ) -func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, +func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, resource pcommon.Resource, settings Settings, name string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) @@ -57,7 +57,7 @@ func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number } } -func (c *prometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, +func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index bb6b8423a2..ff227292b8 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -208,21 +208,15 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - prwMetricsMap, errs := otlptranslator.FromMetrics(req.Metrics(), otlptranslator.Settings{ + converter := otlptranslator.NewPrometheusConverter() + if err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ AddMetricSuffixes: true, - }) - if errs != nil { - level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", errs) - } - - prwMetrics := make([]prompb.TimeSeries, 0, len(prwMetricsMap)) - - for _, ts := range prwMetricsMap { - prwMetrics = append(prwMetrics, *ts) + }); err != nil { + level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) } err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{ - Timeseries: prwMetrics, + Timeseries: converter.TimeSeries(), }) switch {