diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d30800b2a3..2623720bd1 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -282,12 +282,23 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "otlp-deltatocumulative": c.web.ConvertOTLPDelta = true logger.Info("Converting delta OTLP metrics to cumulative") + case "otlp-native-delta-ingestion": + // Experimental OTLP native delta ingestion. + // This currently just stores the raw delta value as-is with unknown metric type. Better typing and + // type-aware functions may come later. + // See proposal: https://github.com/prometheus/proposals/pull/48 + c.web.NativeOTLPDeltaIngestion = true + logger.Info("Enabling native ingestion of delta OTLP metrics, storing the raw sample values without conversion. WARNING: Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.") default: logger.Warn("Unknown option for --enable-feature", "option", o) } } } + if c.web.ConvertOTLPDelta && c.web.NativeOTLPDeltaIngestion { + return errors.New("cannot enable otlp-deltatocumulative and otlp-native-delta-ingestion features at the same time") + } + return nil } diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 4243ab9f2b..174184072e 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -168,7 +168,7 @@ recommended to update these files atomically. `--enable-feature=otlp-deltatocumulative` When enabled, Prometheus will convert OTLP metrics from delta temporality to their -cumulative equivalent, instead of dropping them. +cumulative equivalent, instead of dropping them. This cannot be enabled in conjunction with `otlp-native-delta-ingestion`. This uses [deltatocumulative][d2c] @@ -218,3 +218,32 @@ Examples of equivalent durations: * `(2 ^ 3) * 1m` is the equivalent to `8m` or `480s` [d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor + +## OTLP Native Delta Support + +`--enable-feature=otlp-native-delta-ingestion` + +When enabled, allows for the native ingestion of delta OTLP metrics, storing the raw sample values without conversion. This cannot be enabled in conjunction with `otlp-deltatocumulative`. + +Currently, the StartTimeUnixNano field is ignored, and deltas are given the unknown metric metadata type. + +Delta support is in a very early stage of development and the ingestion and querying process my change over time. For the open proposal see [prometheus/proposals#48](https://github.com/prometheus/proposals/pull/48). + +### Querying + +We encourage users to experiment with deltas and existing PromQL functions; we will collect feedback and likely build features to improve the experience around querying deltas. + +Note that standard PromQL counter functions like `rate()` and `increase()` are designed for cumulative metrics and will produce incorrect results when used with delta metrics. This may change in the future, but for now, to get similar results for delta metrics, you need `sum_over_time()`: + +* `sum_over_time(delta_metric[])`: Calculates the sum of delta values over the specified time range. +* `sum_over_time(delta_metric[]) / `: Calculates the per-second rate of the delta metric. + +These may not work well if the `` is not a multiple of the collection interval of the metric. For example, if you do `sum_over_time(delta_metric[1m]) / 1m` range query (with a 1m step), but the collection interval of a metric is 10m, the graph will show a single point every 10 minutes with a high rate value, rather than 10 points with a lower, constant value. + +### Current gotchas + +* If delta metrics are exposed via [federation](https://prometheus.io/docs/prometheus/latest/federation/), data can be incorrectly collected if the ingestion interval is not the same as the scrape interval for the federated endpoint. + +* It is difficult to figure out whether a metric has delta or cumulative temporality, since there's no indication of temporality in metric names or labels. For now, if you are ingesting a mix of delta and cumulative metrics we advise you to explicitly add your own labels to distinguish them. In the future, we plan to introduce type labels to consistently distinguish metric types and potentially make PromQL functions type-aware (e.g. providing warnings when cumulative-only functions are used with delta metrics). + +* If there are multiple samples being ingested at the same timestamp, only one of the points is kept - the samples are **not** summed together (this is how Prometheus works in general - duplicate timestamp samples are rejected). Any aggregation will have to be done before sending samples to Prometheus. \ No newline at end of file diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 0660f8ee5f..09be335a8b 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -224,21 +224,19 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, setting return labels } -// isValidAggregationTemporality checks whether an OTel metric has a valid -// aggregation temporality for conversion to a Prometheus metric. -func isValidAggregationTemporality(metric pmetric.Metric) bool { +func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporality, bool, error) { //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: - return true + return 0, false, nil case pmetric.MetricTypeSum: - return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Sum().AggregationTemporality(), true, nil case pmetric.MetricTypeHistogram: - return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.Histogram().AggregationTemporality(), true, nil case pmetric.MetricTypeExponentialHistogram: - return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + return metric.ExponentialHistogram().AggregationTemporality(), true, nil } - return false + return 0, false, fmt.Errorf("could not get aggregation temporality for %s as it has unsupported metric type %s", metric.Name(), metric.Type()) } // addHistogramDataPoints adds OTel histogram data points to the corresponding Prometheus time series diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index db26b62925..6a405f104f 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -37,6 +37,7 @@ const defaultZeroThreshold = 1e-128 // as native histogram samples. func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice, resource pcommon.Resource, settings Settings, promName string, + temporality pmetric.AggregationTemporality, ) (annotations.Annotations, error) { var annots annotations.Annotations for x := 0; x < dataPoints.Len(); x++ { @@ -46,7 +47,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont pt := dataPoints.At(x) - histogram, ws, err := exponentialToNativeHistogram(pt) + histogram, ws, err := exponentialToNativeHistogram(pt, temporality) annots.Merge(ws) if err != nil { return annots, err @@ -76,7 +77,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont // exponentialToNativeHistogram translates an OTel Exponential Histogram data point // to a Prometheus Native Histogram. -func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (prompb.Histogram, annotations.Annotations, error) { var annots annotations.Annotations scale := p.Scale() if scale < -4 { @@ -94,17 +95,27 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true) nSpans, nDeltas := convertBucketsLayout(p.Negative().BucketCounts().AsRaw(), p.Negative().Offset(), scaleDown, true) + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + resetHint := prompb.Histogram_UNKNOWN + + if temporality == pmetric.AggregationTemporalityDelta { + // If the histogram has delta temporality, set the reset hint to gauge to avoid unnecessary chunk cutting. + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/). + // This might be changed to a different hint name as gauge type might be misleading for samples that should be + // summed over time. + resetHint = prompb.Histogram_GAUGE + } + h := prompb.Histogram{ - // The counter reset detection must be compatible with Prometheus to - // safely set ResetHint to NO. This is not ensured currently. - // Sending a sample that triggers counter reset but with ResetHint==NO - // would lead to Prometheus panic as it does not double check the hint. - // Thus we're explicitly saying UNKNOWN here, which is always safe. - // TODO: using created time stamp should be accurate, but we - // need to know here if it was used for the detection. - // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 - // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 - ResetHint: prompb.Histogram_UNKNOWN, + ResetHint: resetHint, Schema: scale, ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, @@ -242,6 +253,7 @@ func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjust func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, resource pcommon.Resource, settings Settings, promName string, + temporality pmetric.AggregationTemporality, ) (annotations.Annotations, error) { var annots annotations.Annotations @@ -252,7 +264,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co pt := dataPoints.At(x) - histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt) + histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt, temporality) annots.Merge(ws) if err != nil { return annots, err @@ -281,7 +293,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co return annots, nil } -func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { +func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint, temporality pmetric.AggregationTemporality) (prompb.Histogram, annotations.Annotations, error) { var annots annotations.Annotations buckets := p.BucketCounts().AsRaw() @@ -289,18 +301,28 @@ func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (pr bucketCounts := buckets[offset:] positiveSpans, positiveDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false) + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + resetHint := prompb.Histogram_UNKNOWN + + if temporality == pmetric.AggregationTemporalityDelta { + // If the histogram has delta temporality, set the reset hint to gauge to avoid unnecessary chunk cutting. + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/). + // This might be changed to a different hint name as gauge type might be misleading for samples that should be + // summed over time. + resetHint = prompb.Histogram_GAUGE + } + // TODO(carrieedwards): Add setting to limit maximum bucket count h := prompb.Histogram{ - // The counter reset detection must be compatible with Prometheus to - // safely set ResetHint to NO. This is not ensured currently. - // Sending a sample that triggers counter reset but with ResetHint==NO - // would lead to Prometheus panic as it does not double check the hint. - // Thus we're explicitly saying UNKNOWN here, which is always safe. - // TODO: using created time stamp should be accurate, but we - // need to know here if it was used for the detection. - // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 - // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 - ResetHint: prompb.Histogram_UNKNOWN, + ResetHint: resetHint, Schema: histogram.CustomBucketsSchema, PositiveSpans: positiveSpans, diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go index 63e453a535..8071b1c93c 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go @@ -566,7 +566,7 @@ func TestExponentialToNativeHistogram(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { validateExponentialHistogramCount(t, tt.exponentialHist()) // Sanity check. - got, annots, err := exponentialToNativeHistogram(tt.exponentialHist()) + got, annots, err := exponentialToNativeHistogram(tt.exponentialHist(), pmetric.AggregationTemporalityCumulative) if tt.wantErrMessage != "" { require.ErrorContains(t, err, tt.wantErrMessage) return @@ -769,6 +769,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { ExportCreatedMetric: true, }, otlptranslator.BuildCompliantMetricName(metric, "", true), + pmetric.AggregationTemporalityCumulative, ) require.NoError(t, err) require.Empty(t, annots) @@ -972,7 +973,7 @@ func TestHistogramToCustomBucketsHistogram(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { validateHistogramCount(t, tt.hist()) - got, annots, err := explicitHistogramToCustomBucketsHistogram(tt.hist()) + got, annots, err := explicitHistogramToCustomBucketsHistogram(tt.hist(), pmetric.AggregationTemporalityCumulative) if tt.wantErrMessage != "" { require.ErrorContains(t, err, tt.wantErrMessage) return @@ -1137,6 +1138,7 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) { ConvertHistogramsToNHCB: true, }, otlptranslator.BuildCompliantMetricName(metric, "", true), + pmetric.AggregationTemporalityCumulative, ) require.NoError(t, err) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index d2e79e4b6f..79d127bb80 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -41,6 +41,7 @@ type Settings struct { PromoteResourceAttributes []string KeepIdentifyingResourceAttributes bool ConvertHistogramsToNHCB bool + AllowDeltaTemporality bool } // PrometheusConverter converts from OTel write format to Prometheus remote write format. @@ -91,8 +92,18 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric metric := metricSlice.At(k) mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) + temporality, hasTemporality, err := aggregationTemporality(metric) + if err != nil { + errs = multierr.Append(errs, err) + continue + } - if !isValidAggregationTemporality(metric) { + if hasTemporality && + // Cumulative temporality is always valid. + // Delta temporality is also valid if AllowDeltaTemporality is true. + // All other temporality values are invalid. + !(temporality == pmetric.AggregationTemporalityCumulative || + (settings.AllowDeltaTemporality && temporality == pmetric.AggregationTemporalityDelta)) { errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) continue } @@ -144,7 +155,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric break } if settings.ConvertHistogramsToNHCB { - ws, err := c.addCustomBucketsHistogramDataPoints(ctx, dataPoints, resource, settings, promName) + ws, err := c.addCustomBucketsHistogramDataPoints(ctx, dataPoints, resource, settings, promName, temporality) annots.Merge(ws) if err != nil { errs = multierr.Append(errs, err) @@ -172,6 +183,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric resource, settings, promName, + temporality, ) annots.Merge(ws) if err != nil { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index d9f433d713..a222d741d1 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -19,6 +19,7 @@ package prometheusremotewrite import ( "context" "fmt" + "sort" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/util/testutil" ) func TestFromMetrics(t *testing.T) { @@ -235,6 +237,461 @@ func TestFromMetrics(t *testing.T) { }) } +func TestTemporality(t *testing.T) { + ts := time.Unix(100, 0) + + tests := []struct { + name string + allowDelta bool + convertToNHCB bool + inputSeries []pmetric.Metric + expectedSeries []prompb.TimeSeries + expectedError string + }{ + { + name: "all cumulative when delta not allowed", + allowDelta: false, + inputSeries: []pmetric.Metric{ + createOtelSum("test_metric_1", pmetric.AggregationTemporalityCumulative, ts), + createOtelSum("test_metric_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_metric_1", ts), + createPromFloatSeries("test_metric_2", ts), + }, + }, + { + name: "all delta when allowed", + allowDelta: true, + inputSeries: []pmetric.Metric{ + createOtelSum("test_metric_1", pmetric.AggregationTemporalityDelta, ts), + createOtelSum("test_metric_2", pmetric.AggregationTemporalityDelta, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_metric_1", ts), + createPromFloatSeries("test_metric_2", ts), + }, + }, + { + name: "mixed temporality when delta allowed", + allowDelta: true, + inputSeries: []pmetric.Metric{ + createOtelSum("test_metric_1", pmetric.AggregationTemporalityDelta, ts), + createOtelSum("test_metric_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_metric_1", ts), + createPromFloatSeries("test_metric_2", ts), + }, + }, + { + name: "delta rejected when not allowed", + allowDelta: false, + inputSeries: []pmetric.Metric{ + createOtelSum("test_metric_1", pmetric.AggregationTemporalityCumulative, ts), + createOtelSum("test_metric_2", pmetric.AggregationTemporalityDelta, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_metric_1", ts), + }, + expectedError: `invalid temporality and type combination for metric "test_metric_2"`, + }, + { + name: "unspecified temporality not allowed", + allowDelta: true, + inputSeries: []pmetric.Metric{ + createOtelSum("test_metric_1", pmetric.AggregationTemporalityCumulative, ts), + createOtelSum("test_metric_2", pmetric.AggregationTemporalityUnspecified, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_metric_1", ts), + }, + expectedError: `invalid temporality and type combination for metric "test_metric_2"`, + }, + { + name: "cumulative histogram", + allowDelta: false, + inputSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_histogram", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNativeHistogramSeries("test_histogram", prompb.Histogram_UNKNOWN, ts), + }, + }, + { + name: "delta histogram when allowed", + allowDelta: true, + inputSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExponentialHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNativeHistogramSeries("test_histogram_1", prompb.Histogram_GAUGE, ts), + createPromNativeHistogramSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts), + }, + }, + { + name: "delta histogram when not allowed", + allowDelta: false, + inputSeries: []pmetric.Metric{ + createOtelExponentialHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExponentialHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNativeHistogramSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts), + }, + expectedError: `invalid temporality and type combination for metric "test_histogram_1"`, + }, + { + name: "cumulative histogram with buckets", + allowDelta: false, + convertToNHCB: true, + inputSeries: []pmetric.Metric{ + createOtelExplicitHistogram("test_histogram", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNHCBSeries("test_histogram", prompb.Histogram_UNKNOWN, ts), + }, + }, + { + name: "delta histogram with buckets when allowed", + allowDelta: true, + convertToNHCB: true, + inputSeries: []pmetric.Metric{ + createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNHCBSeries("test_histogram_1", prompb.Histogram_GAUGE, ts), + createPromNHCBSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts), + }, + }, + { + name: "delta histogram with buckets when not allowed", + allowDelta: false, + convertToNHCB: true, + inputSeries: []pmetric.Metric{ + createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromNHCBSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts), + }, + expectedError: `invalid temporality and type combination for metric "test_histogram_1"`, + }, + { + name: "delta histogram with buckets and convertToNHCB=false when not allowed", + allowDelta: false, + convertToNHCB: false, + inputSeries: []pmetric.Metric{ + createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: createPromClassicHistogramSeries("test_histogram_2", ts), + expectedError: `invalid temporality and type combination for metric "test_histogram_1"`, + }, + { + name: "delta histogram with buckets and convertToNHCB=false when allowed", + allowDelta: true, + convertToNHCB: false, + inputSeries: []pmetric.Metric{ + createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts), + createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts), + }, + expectedSeries: append( + createPromClassicHistogramSeries("test_histogram_1", ts), + createPromClassicHistogramSeries("test_histogram_2", ts)..., + ), + }, + { + name: "summary does not have temporality", + inputSeries: []pmetric.Metric{ + createOtelSummary("test_summary_1", ts), + }, + expectedSeries: createPromSummarySeries("test_summary_1", ts), + }, + { + name: "gauge does not have temporality", + inputSeries: []pmetric.Metric{ + createOtelGauge("test_gauge_1", ts), + }, + expectedSeries: []prompb.TimeSeries{ + createPromFloatSeries("test_gauge_1", ts), + }, + }, + { + name: "empty metric type errors", + inputSeries: []pmetric.Metric{ + createOtelEmptyType("test_empty"), + }, + expectedSeries: []prompb.TimeSeries{}, + expectedError: `could not get aggregation temporality for test_empty as it has unsupported metric type Empty`, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + + for _, s := range tc.inputSeries { + s.CopyTo(sm.Metrics().AppendEmpty()) + } + + c := NewPrometheusConverter() + settings := Settings{ + AllowDeltaTemporality: tc.allowDelta, + ConvertHistogramsToNHCB: tc.convertToNHCB, + } + + _, err := c.FromMetrics(context.Background(), metrics, settings) + + if tc.expectedError != "" { + require.EqualError(t, err, tc.expectedError) + } else { + require.NoError(t, err) + } + + series := c.TimeSeries() + + // Sort series to make the test deterministic. + testutil.RequireEqual(t, sortTimeSeries(tc.expectedSeries), sortTimeSeries(series)) + }) + } +} + +func createOtelSum(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + sum := m.SetEmptySum() + sum.SetAggregationTemporality(temporality) + dp := sum.DataPoints().AppendEmpty() + dp.SetDoubleValue(5) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +func createPromFloatSeries(name string, ts time.Time) prompb.TimeSeries { + return prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{ + Value: 5, + Timestamp: ts.UnixMilli(), + }}, + } +} + +func createOtelGauge(name string, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + gauge := m.SetEmptyGauge() + dp := gauge.DataPoints().AppendEmpty() + dp.SetDoubleValue(5) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +func createOtelExponentialHistogram(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + hist := m.SetEmptyExponentialHistogram() + hist.SetAggregationTemporality(temporality) + dp := hist.DataPoints().AppendEmpty() + dp.SetCount(1) + dp.SetSum(5) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +func createPromNativeHistogramSeries(name string, hint prompb.Histogram_ResetHint, ts time.Time) prompb.TimeSeries { + return prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Histograms: []prompb.Histogram{ + { + Count: &prompb.Histogram_CountInt{CountInt: 1}, + Sum: 5, + Schema: 0, + ZeroThreshold: 1e-128, + ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0}, + Timestamp: ts.UnixMilli(), + ResetHint: hint, + }, + }, + } +} + +func createOtelExplicitHistogram(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + hist := m.SetEmptyHistogram() + hist.SetAggregationTemporality(temporality) + dp := hist.DataPoints().AppendEmpty() + dp.SetCount(20) + dp.SetSum(30) + dp.BucketCounts().FromRaw([]uint64{10, 10, 0}) + dp.ExplicitBounds().FromRaw([]float64{1, 2}) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +func createPromNHCBSeries(name string, hint prompb.Histogram_ResetHint, ts time.Time) prompb.TimeSeries { + return prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: "__name__", Value: name}, + {Name: "test_label", Value: "test_value"}, + }, + Histograms: []prompb.Histogram{ + { + Count: &prompb.Histogram_CountInt{CountInt: 20}, + Sum: 30, + Schema: -53, + ZeroThreshold: 0, + ZeroCount: nil, + PositiveSpans: []prompb.BucketSpan{ + { + Length: 3, + }, + }, + PositiveDeltas: []int64{10, 0, -10}, + CustomValues: []float64{1, 2}, + Timestamp: ts.UnixMilli(), + ResetHint: hint, + }, + }, + } +} + +func createPromClassicHistogramSeries(name string, ts time.Time) []prompb.TimeSeries { + return []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_bucket"}, + {Name: "le", Value: "1"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{Value: 10, Timestamp: ts.UnixMilli()}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_bucket"}, + {Name: "le", Value: "2"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{Value: 20, Timestamp: ts.UnixMilli()}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_bucket"}, + {Name: "le", Value: "+Inf"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{Value: 20, Timestamp: ts.UnixMilli()}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_count"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{Value: 20, Timestamp: ts.UnixMilli()}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_sum"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{Value: 30, Timestamp: ts.UnixMilli()}}, + }, + } +} + +func createOtelSummary(name string, ts time.Time) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + summary := m.SetEmptySummary() + dp := summary.DataPoints().AppendEmpty() + dp.SetCount(9) + dp.SetSum(18) + qv := dp.QuantileValues().AppendEmpty() + qv.SetQuantile(0.5) + qv.SetValue(2) + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.Attributes().PutStr("test_label", "test_value") + return m +} + +func createPromSummarySeries(name string, ts time.Time) []prompb.TimeSeries { + return []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_sum"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{ + Value: 18, + Timestamp: ts.UnixMilli(), + }}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name + "_count"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{ + Value: 9, + Timestamp: ts.UnixMilli(), + }}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: name}, + {Name: "quantile", Value: "0.5"}, + {Name: "test_label", Value: "test_value"}, + }, + Samples: []prompb.Sample{{ + Value: 2, + Timestamp: ts.UnixMilli(), + }}, + }, + } +} + +func createOtelEmptyType(name string) pmetric.Metric { + metrics := pmetric.NewMetricSlice() + m := metrics.AppendEmpty() + m.SetName(name) + return m +} + +func sortTimeSeries(series []prompb.TimeSeries) []prompb.TimeSeries { + for i := range series { + sort.Slice(series[i].Labels, func(j, k int) bool { + return series[i].Labels[j].Name < series[i].Labels[k].Name + }) + } + + sort.Slice(series, func(i, j int) bool { + return fmt.Sprint(series[i].Labels) < fmt.Sprint(series[j].Labels) + }) + + return series +} + func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for _, resourceAttributeCount := range []int{0, 5, 50} { b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { diff --git a/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go b/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go index 359fc52522..716a6cd6f9 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/otlp_to_openmetrics_metadata.go @@ -31,12 +31,27 @@ func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) prompb.MetricMeta if otelMetric.Sum().IsMonotonic() { metricType = prompb.MetricMetadata_COUNTER } + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + if otelMetric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta { + metricType = prompb.MetricMetadata_UNKNOWN + } return metricType case pmetric.MetricTypeHistogram: + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + if otelMetric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta { + return prompb.MetricMetadata_UNKNOWN + } return prompb.MetricMetadata_HISTOGRAM case pmetric.MetricTypeSummary: return prompb.MetricMetadata_SUMMARY case pmetric.MetricTypeExponentialHistogram: + if otelMetric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta { + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + // We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now. + return prompb.MetricMetadata_UNKNOWN + } return prompb.MetricMetadata_HISTOGRAM } return prompb.MetricMetadata_UNKNOWN diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index fa77fc398f..d43edd78bb 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -526,20 +526,30 @@ func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref stora type OTLPOptions struct { // Convert delta samples to their cumulative equivalent by aggregating in-memory ConvertDelta bool + // Store the raw delta samples as metrics with unknown type (we don't have a proper type for delta yet, therefore + // marking the metric type as unknown for now). + // We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/) + NativeDelta bool } // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendable storage.Appendable, configFunc func() config.Config, opts OTLPOptions) http.Handler { + if opts.NativeDelta && opts.ConvertDelta { + // This should be validated when iterating through feature flags, so not expected to fail here. + panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time") + } + ex := &rwExporter{ writeHandler: &writeHandler{ logger: logger, appendable: appendable, }, - config: configFunc, + config: configFunc, + allowDeltaTemporality: opts.NativeDelta, } - wh := &otlpWriteHandler{logger: logger, cumul: ex} + wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex} if opts.ConvertDelta { fac := deltatocumulative.NewFactory() @@ -547,7 +557,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl ID: component.NewID(fac.Type()), TelemetrySettings: component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()}, } - d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.cumul) + d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.defaultConsumer) if err != nil { // fac.CreateMetrics directly calls [deltatocumulativeprocessor.createMetricsProcessor], // which only errors if: @@ -563,7 +573,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl // deltatocumulative does not error on start. see above for panic reasoning panic(err) } - wh.delta = d2c + wh.d2cConsumer = d2c } return wh @@ -571,7 +581,8 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl type rwExporter struct { *writeHandler - config func() config.Config + config func() config.Config + allowDeltaTemporality bool } func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -584,6 +595,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes, ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB, + AllowDeltaTemporality: rw.allowDeltaTemporality, }) if err != nil { rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err) @@ -607,8 +619,8 @@ func (rw *rwExporter) Capabilities() consumer.Capabilities { type otlpWriteHandler struct { logger *slog.Logger - cumul consumer.Metrics // only cumulative - delta consumer.Metrics // delta capable + defaultConsumer consumer.Metrics // stores deltas as-is + d2cConsumer consumer.Metrics // converts deltas to cumulative } func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -620,13 +632,15 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } md := req.Metrics() - // if delta conversion enabled AND delta samples exist, use slower delta capable path - if h.delta != nil && hasDelta(md) { - err = h.delta.ConsumeMetrics(r.Context(), md) + // If deltatocumulative conversion enabled AND delta samples exist, use slower conversion path. + // While deltatocumulative can also accept cumulative metrics (and then just forwards them as-is), it currently + // holds a sync.Mutex when entering ConsumeMetrics. This is slow and not necessary when ingesting cumulative metrics. + if h.d2cConsumer != nil && hasDelta(md) { + err = h.d2cConsumer.ConsumeMetrics(r.Context(), md) } else { - // deltatocumulative currently holds a sync.Mutex when entering ConsumeMetrics. - // This is slow and not necessary when no delta samples exist anyways - err = h.cumul.ConsumeMetrics(r.Context(), md) + // Otherwise use default consumer (alongside cumulative samples, this will accept delta samples and write as-is + // if native-delta-support is enabled). + err = h.defaultConsumer.ConsumeMetrics(r.Context(), md) } switch { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 82aff1c940..c924c9092c 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -262,7 +262,7 @@ func NewAPI( statsRenderer StatsRenderer, rwEnabled bool, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, - otlpEnabled, otlpDeltaToCumulative bool, + otlpEnabled, otlpDeltaToCumulative, otlpNativeDeltaIngestion bool, ctZeroIngestionEnabled bool, ) *API { a := &API{ @@ -310,7 +310,7 @@ func NewAPI( a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative}) + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative, NativeDelta: otlpNativeDeltaIngestion}) } return a diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index eb929c33ce..bb70792583 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -144,6 +144,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route false, false, false, + false, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 21624141ad..84c4a2a529 100644 --- a/web/web.go +++ b/web/web.go @@ -290,6 +290,7 @@ type Options struct { EnableRemoteWriteReceiver bool EnableOTLPWriteReceiver bool ConvertOTLPDelta bool + NativeOTLPDeltaIngestion bool IsAgent bool CTZeroIngestionEnabled bool AppName string @@ -389,6 +390,7 @@ func New(logger *slog.Logger, o *Options) *Handler { o.AcceptRemoteWriteProtoMsgs, o.EnableOTLPWriteReceiver, o.ConvertOTLPDelta, + o.NativeOTLPDeltaIngestion, o.CTZeroIngestionEnabled, )