Add primitive support for ingesting OTLP delta metrics as-is (#16360)

* Add simple delta support

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Rename delta2cumulative part

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Whoops bad refactor

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add example yml

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Feature flag instead and histogram hint handling

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Delete otel_delta.yml - outdated

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Renaming to native delta support

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add more explanatory comments

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add more explanation to histograms

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Correct comment on d2c consumer

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add tests for counters and fix bug

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add histogram tests

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add docs

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Sort series to make test deterministic

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* More formatting

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Change flag name to ingestion

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Explain where rate calculation can go wrong

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add warning about duplicate timestamps

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Update docs/feature_flags.md

Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Fix tests

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Remove unnecessary if

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Add warning to d2c section

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Make unknown type error when getting temporality

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Correct type comment - not planning to add delta metric metadata type

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Remove unused param for empty type

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Rewrite temporality logic to be clearer

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

* Change spurious to unnecessary - better description

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>

---------

Signed-off-by: Fiona Liao <fiona.liao@grafana.com>
Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
Fiona Liao 2025-04-23 13:58:02 +01:00 committed by GitHub
parent 23d73e8138
commit 7ec63b1fa1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 615 additions and 52 deletions

View File

@ -282,12 +282,23 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "otlp-deltatocumulative":
c.web.ConvertOTLPDelta = true
logger.Info("Converting delta OTLP metrics to cumulative")
case "otlp-native-delta-ingestion":
// Experimental OTLP native delta ingestion.
// This currently just stores the raw delta value as-is with unknown metric type. Better typing and
// type-aware functions may come later.
// See proposal: https://github.com/prometheus/proposals/pull/48
c.web.NativeOTLPDeltaIngestion = true
logger.Info("Enabling native ingestion of delta OTLP metrics, storing the raw sample values without conversion. WARNING: Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.")
default:
logger.Warn("Unknown option for --enable-feature", "option", o)
}
}
}
if c.web.ConvertOTLPDelta && c.web.NativeOTLPDeltaIngestion {
return errors.New("cannot enable otlp-deltatocumulative and otlp-native-delta-ingestion features at the same time")
}
return nil
}

View File

@ -168,7 +168,7 @@ recommended to update these files atomically.
`--enable-feature=otlp-deltatocumulative`
When enabled, Prometheus will convert OTLP metrics from delta temporality to their
cumulative equivalent, instead of dropping them.
cumulative equivalent, instead of dropping them. This cannot be enabled in conjunction with `otlp-native-delta-ingestion`.
This uses
[deltatocumulative][d2c]
@ -218,3 +218,32 @@ Examples of equivalent durations:
* `(2 ^ 3) * 1m` is the equivalent to `8m` or `480s`
[d2c]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor
## OTLP Native Delta Support
`--enable-feature=otlp-native-delta-ingestion`
When enabled, allows for the native ingestion of delta OTLP metrics, storing the raw sample values without conversion. This cannot be enabled in conjunction with `otlp-deltatocumulative`.
Currently, the StartTimeUnixNano field is ignored, and deltas are given the unknown metric metadata type.
Delta support is in a very early stage of development and the ingestion and querying process my change over time. For the open proposal see [prometheus/proposals#48](https://github.com/prometheus/proposals/pull/48).
### Querying
We encourage users to experiment with deltas and existing PromQL functions; we will collect feedback and likely build features to improve the experience around querying deltas.
Note that standard PromQL counter functions like `rate()` and `increase()` are designed for cumulative metrics and will produce incorrect results when used with delta metrics. This may change in the future, but for now, to get similar results for delta metrics, you need `sum_over_time()`:
* `sum_over_time(delta_metric[<range>])`: Calculates the sum of delta values over the specified time range.
* `sum_over_time(delta_metric[<range>]) / <range>`: Calculates the per-second rate of the delta metric.
These may not work well if the `<range>` is not a multiple of the collection interval of the metric. For example, if you do `sum_over_time(delta_metric[1m]) / 1m` range query (with a 1m step), but the collection interval of a metric is 10m, the graph will show a single point every 10 minutes with a high rate value, rather than 10 points with a lower, constant value.
### Current gotchas
* If delta metrics are exposed via [federation](https://prometheus.io/docs/prometheus/latest/federation/), data can be incorrectly collected if the ingestion interval is not the same as the scrape interval for the federated endpoint.
* It is difficult to figure out whether a metric has delta or cumulative temporality, since there's no indication of temporality in metric names or labels. For now, if you are ingesting a mix of delta and cumulative metrics we advise you to explicitly add your own labels to distinguish them. In the future, we plan to introduce type labels to consistently distinguish metric types and potentially make PromQL functions type-aware (e.g. providing warnings when cumulative-only functions are used with delta metrics).
* If there are multiple samples being ingested at the same timestamp, only one of the points is kept - the samples are **not** summed together (this is how Prometheus works in general - duplicate timestamp samples are rejected). Any aggregation will have to be done before sending samples to Prometheus.

View File

@ -224,21 +224,19 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, setting
return labels
}
// isValidAggregationTemporality checks whether an OTel metric has a valid
// aggregation temporality for conversion to a Prometheus metric.
func isValidAggregationTemporality(metric pmetric.Metric) bool {
func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporality, bool, error) {
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
return true
return 0, false, nil
case pmetric.MetricTypeSum:
return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Sum().AggregationTemporality(), true, nil
case pmetric.MetricTypeHistogram:
return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.Histogram().AggregationTemporality(), true, nil
case pmetric.MetricTypeExponentialHistogram:
return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
return metric.ExponentialHistogram().AggregationTemporality(), true, nil
}
return false
return 0, false, fmt.Errorf("could not get aggregation temporality for %s as it has unsupported metric type %s", metric.Name(), metric.Type())
}
// addHistogramDataPoints adds OTel histogram data points to the corresponding Prometheus time series

View File

@ -37,6 +37,7 @@ const defaultZeroThreshold = 1e-128
// as native histogram samples.
func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice,
resource pcommon.Resource, settings Settings, promName string,
temporality pmetric.AggregationTemporality,
) (annotations.Annotations, error) {
var annots annotations.Annotations
for x := 0; x < dataPoints.Len(); x++ {
@ -46,7 +47,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
pt := dataPoints.At(x)
histogram, ws, err := exponentialToNativeHistogram(pt)
histogram, ws, err := exponentialToNativeHistogram(pt, temporality)
annots.Merge(ws)
if err != nil {
return annots, err
@ -76,7 +77,7 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
// exponentialToNativeHistogram translates an OTel Exponential Histogram data point
// to a Prometheus Native Histogram.
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) {
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (prompb.Histogram, annotations.Annotations, error) {
var annots annotations.Annotations
scale := p.Scale()
if scale < -4 {
@ -94,17 +95,27 @@ func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prom
pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true)
nSpans, nDeltas := convertBucketsLayout(p.Negative().BucketCounts().AsRaw(), p.Negative().Offset(), scaleDown, true)
// The counter reset detection must be compatible with Prometheus to
// safely set ResetHint to NO. This is not ensured currently.
// Sending a sample that triggers counter reset but with ResetHint==NO
// would lead to Prometheus panic as it does not double check the hint.
// Thus we're explicitly saying UNKNOWN here, which is always safe.
// TODO: using created time stamp should be accurate, but we
// need to know here if it was used for the detection.
// Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303
// Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232
resetHint := prompb.Histogram_UNKNOWN
if temporality == pmetric.AggregationTemporalityDelta {
// If the histogram has delta temporality, set the reset hint to gauge to avoid unnecessary chunk cutting.
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/).
// This might be changed to a different hint name as gauge type might be misleading for samples that should be
// summed over time.
resetHint = prompb.Histogram_GAUGE
}
h := prompb.Histogram{
// The counter reset detection must be compatible with Prometheus to
// safely set ResetHint to NO. This is not ensured currently.
// Sending a sample that triggers counter reset but with ResetHint==NO
// would lead to Prometheus panic as it does not double check the hint.
// Thus we're explicitly saying UNKNOWN here, which is always safe.
// TODO: using created time stamp should be accurate, but we
// need to know here if it was used for the detection.
// Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303
// Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232
ResetHint: prompb.Histogram_UNKNOWN,
ResetHint: resetHint,
Schema: scale,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()},
@ -242,6 +253,7 @@ func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjust
func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, promName string,
temporality pmetric.AggregationTemporality,
) (annotations.Annotations, error) {
var annots annotations.Annotations
@ -252,7 +264,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
pt := dataPoints.At(x)
histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt)
histogram, ws, err := explicitHistogramToCustomBucketsHistogram(pt, temporality)
annots.Merge(ws)
if err != nil {
return annots, err
@ -281,7 +293,7 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
return annots, nil
}
func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) {
func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint, temporality pmetric.AggregationTemporality) (prompb.Histogram, annotations.Annotations, error) {
var annots annotations.Annotations
buckets := p.BucketCounts().AsRaw()
@ -289,18 +301,28 @@ func explicitHistogramToCustomBucketsHistogram(p pmetric.HistogramDataPoint) (pr
bucketCounts := buckets[offset:]
positiveSpans, positiveDeltas := convertBucketsLayout(bucketCounts, int32(offset), 0, false)
// The counter reset detection must be compatible with Prometheus to
// safely set ResetHint to NO. This is not ensured currently.
// Sending a sample that triggers counter reset but with ResetHint==NO
// would lead to Prometheus panic as it does not double check the hint.
// Thus we're explicitly saying UNKNOWN here, which is always safe.
// TODO: using created time stamp should be accurate, but we
// need to know here if it was used for the detection.
// Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303
// Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232
resetHint := prompb.Histogram_UNKNOWN
if temporality == pmetric.AggregationTemporalityDelta {
// If the histogram has delta temporality, set the reset hint to gauge to avoid unnecessary chunk cutting.
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/).
// This might be changed to a different hint name as gauge type might be misleading for samples that should be
// summed over time.
resetHint = prompb.Histogram_GAUGE
}
// TODO(carrieedwards): Add setting to limit maximum bucket count
h := prompb.Histogram{
// The counter reset detection must be compatible with Prometheus to
// safely set ResetHint to NO. This is not ensured currently.
// Sending a sample that triggers counter reset but with ResetHint==NO
// would lead to Prometheus panic as it does not double check the hint.
// Thus we're explicitly saying UNKNOWN here, which is always safe.
// TODO: using created time stamp should be accurate, but we
// need to know here if it was used for the detection.
// Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303
// Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232
ResetHint: prompb.Histogram_UNKNOWN,
ResetHint: resetHint,
Schema: histogram.CustomBucketsSchema,
PositiveSpans: positiveSpans,

View File

@ -566,7 +566,7 @@ func TestExponentialToNativeHistogram(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validateExponentialHistogramCount(t, tt.exponentialHist()) // Sanity check.
got, annots, err := exponentialToNativeHistogram(tt.exponentialHist())
got, annots, err := exponentialToNativeHistogram(tt.exponentialHist(), pmetric.AggregationTemporalityCumulative)
if tt.wantErrMessage != "" {
require.ErrorContains(t, err, tt.wantErrMessage)
return
@ -769,6 +769,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
ExportCreatedMetric: true,
},
otlptranslator.BuildCompliantMetricName(metric, "", true),
pmetric.AggregationTemporalityCumulative,
)
require.NoError(t, err)
require.Empty(t, annots)
@ -972,7 +973,7 @@ func TestHistogramToCustomBucketsHistogram(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validateHistogramCount(t, tt.hist())
got, annots, err := explicitHistogramToCustomBucketsHistogram(tt.hist())
got, annots, err := explicitHistogramToCustomBucketsHistogram(tt.hist(), pmetric.AggregationTemporalityCumulative)
if tt.wantErrMessage != "" {
require.ErrorContains(t, err, tt.wantErrMessage)
return
@ -1137,6 +1138,7 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) {
ConvertHistogramsToNHCB: true,
},
otlptranslator.BuildCompliantMetricName(metric, "", true),
pmetric.AggregationTemporalityCumulative,
)
require.NoError(t, err)

View File

@ -41,6 +41,7 @@ type Settings struct {
PromoteResourceAttributes []string
KeepIdentifyingResourceAttributes bool
ConvertHistogramsToNHCB bool
AllowDeltaTemporality bool
}
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
@ -91,8 +92,18 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
metric := metricSlice.At(k)
mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
temporality, hasTemporality, err := aggregationTemporality(metric)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
if !isValidAggregationTemporality(metric) {
if hasTemporality &&
// Cumulative temporality is always valid.
// Delta temporality is also valid if AllowDeltaTemporality is true.
// All other temporality values are invalid.
!(temporality == pmetric.AggregationTemporalityCumulative ||
(settings.AllowDeltaTemporality && temporality == pmetric.AggregationTemporalityDelta)) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}
@ -144,7 +155,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
break
}
if settings.ConvertHistogramsToNHCB {
ws, err := c.addCustomBucketsHistogramDataPoints(ctx, dataPoints, resource, settings, promName)
ws, err := c.addCustomBucketsHistogramDataPoints(ctx, dataPoints, resource, settings, promName, temporality)
annots.Merge(ws)
if err != nil {
errs = multierr.Append(errs, err)
@ -172,6 +183,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
resource,
settings,
promName,
temporality,
)
annots.Merge(ws)
if err != nil {

View File

@ -19,6 +19,7 @@ package prometheusremotewrite
import (
"context"
"fmt"
"sort"
"testing"
"time"
@ -31,6 +32,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/util/testutil"
)
func TestFromMetrics(t *testing.T) {
@ -235,6 +237,461 @@ func TestFromMetrics(t *testing.T) {
})
}
func TestTemporality(t *testing.T) {
ts := time.Unix(100, 0)
tests := []struct {
name string
allowDelta bool
convertToNHCB bool
inputSeries []pmetric.Metric
expectedSeries []prompb.TimeSeries
expectedError string
}{
{
name: "all cumulative when delta not allowed",
allowDelta: false,
inputSeries: []pmetric.Metric{
createOtelSum("test_metric_1", pmetric.AggregationTemporalityCumulative, ts),
createOtelSum("test_metric_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromFloatSeries("test_metric_1", ts),
createPromFloatSeries("test_metric_2", ts),
},
},
{
name: "all delta when allowed",
allowDelta: true,
inputSeries: []pmetric.Metric{
createOtelSum("test_metric_1", pmetric.AggregationTemporalityDelta, ts),
createOtelSum("test_metric_2", pmetric.AggregationTemporalityDelta, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromFloatSeries("test_metric_1", ts),
createPromFloatSeries("test_metric_2", ts),
},
},
{
name: "mixed temporality when delta allowed",
allowDelta: true,
inputSeries: []pmetric.Metric{
createOtelSum("test_metric_1", pmetric.AggregationTemporalityDelta, ts),
createOtelSum("test_metric_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromFloatSeries("test_metric_1", ts),
createPromFloatSeries("test_metric_2", ts),
},
},
{
name: "delta rejected when not allowed",
allowDelta: false,
inputSeries: []pmetric.Metric{
createOtelSum("test_metric_1", pmetric.AggregationTemporalityCumulative, ts),
createOtelSum("test_metric_2", pmetric.AggregationTemporalityDelta, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromFloatSeries("test_metric_1", ts),
},
expectedError: `invalid temporality and type combination for metric "test_metric_2"`,
},
{
name: "unspecified temporality not allowed",
allowDelta: true,
inputSeries: []pmetric.Metric{
createOtelSum("test_metric_1", pmetric.AggregationTemporalityCumulative, ts),
createOtelSum("test_metric_2", pmetric.AggregationTemporalityUnspecified, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromFloatSeries("test_metric_1", ts),
},
expectedError: `invalid temporality and type combination for metric "test_metric_2"`,
},
{
name: "cumulative histogram",
allowDelta: false,
inputSeries: []pmetric.Metric{
createOtelExponentialHistogram("test_histogram", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromNativeHistogramSeries("test_histogram", prompb.Histogram_UNKNOWN, ts),
},
},
{
name: "delta histogram when allowed",
allowDelta: true,
inputSeries: []pmetric.Metric{
createOtelExponentialHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts),
createOtelExponentialHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromNativeHistogramSeries("test_histogram_1", prompb.Histogram_GAUGE, ts),
createPromNativeHistogramSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts),
},
},
{
name: "delta histogram when not allowed",
allowDelta: false,
inputSeries: []pmetric.Metric{
createOtelExponentialHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts),
createOtelExponentialHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromNativeHistogramSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts),
},
expectedError: `invalid temporality and type combination for metric "test_histogram_1"`,
},
{
name: "cumulative histogram with buckets",
allowDelta: false,
convertToNHCB: true,
inputSeries: []pmetric.Metric{
createOtelExplicitHistogram("test_histogram", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromNHCBSeries("test_histogram", prompb.Histogram_UNKNOWN, ts),
},
},
{
name: "delta histogram with buckets when allowed",
allowDelta: true,
convertToNHCB: true,
inputSeries: []pmetric.Metric{
createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts),
createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromNHCBSeries("test_histogram_1", prompb.Histogram_GAUGE, ts),
createPromNHCBSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts),
},
},
{
name: "delta histogram with buckets when not allowed",
allowDelta: false,
convertToNHCB: true,
inputSeries: []pmetric.Metric{
createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts),
createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: []prompb.TimeSeries{
createPromNHCBSeries("test_histogram_2", prompb.Histogram_UNKNOWN, ts),
},
expectedError: `invalid temporality and type combination for metric "test_histogram_1"`,
},
{
name: "delta histogram with buckets and convertToNHCB=false when not allowed",
allowDelta: false,
convertToNHCB: false,
inputSeries: []pmetric.Metric{
createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts),
createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: createPromClassicHistogramSeries("test_histogram_2", ts),
expectedError: `invalid temporality and type combination for metric "test_histogram_1"`,
},
{
name: "delta histogram with buckets and convertToNHCB=false when allowed",
allowDelta: true,
convertToNHCB: false,
inputSeries: []pmetric.Metric{
createOtelExplicitHistogram("test_histogram_1", pmetric.AggregationTemporalityDelta, ts),
createOtelExplicitHistogram("test_histogram_2", pmetric.AggregationTemporalityCumulative, ts),
},
expectedSeries: append(
createPromClassicHistogramSeries("test_histogram_1", ts),
createPromClassicHistogramSeries("test_histogram_2", ts)...,
),
},
{
name: "summary does not have temporality",
inputSeries: []pmetric.Metric{
createOtelSummary("test_summary_1", ts),
},
expectedSeries: createPromSummarySeries("test_summary_1", ts),
},
{
name: "gauge does not have temporality",
inputSeries: []pmetric.Metric{
createOtelGauge("test_gauge_1", ts),
},
expectedSeries: []prompb.TimeSeries{
createPromFloatSeries("test_gauge_1", ts),
},
},
{
name: "empty metric type errors",
inputSeries: []pmetric.Metric{
createOtelEmptyType("test_empty"),
},
expectedSeries: []prompb.TimeSeries{},
expectedError: `could not get aggregation temporality for test_empty as it has unsupported metric type Empty`,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
for _, s := range tc.inputSeries {
s.CopyTo(sm.Metrics().AppendEmpty())
}
c := NewPrometheusConverter()
settings := Settings{
AllowDeltaTemporality: tc.allowDelta,
ConvertHistogramsToNHCB: tc.convertToNHCB,
}
_, err := c.FromMetrics(context.Background(), metrics, settings)
if tc.expectedError != "" {
require.EqualError(t, err, tc.expectedError)
} else {
require.NoError(t, err)
}
series := c.TimeSeries()
// Sort series to make the test deterministic.
testutil.RequireEqual(t, sortTimeSeries(tc.expectedSeries), sortTimeSeries(series))
})
}
}
func createOtelSum(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric {
metrics := pmetric.NewMetricSlice()
m := metrics.AppendEmpty()
m.SetName(name)
sum := m.SetEmptySum()
sum.SetAggregationTemporality(temporality)
dp := sum.DataPoints().AppendEmpty()
dp.SetDoubleValue(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("test_label", "test_value")
return m
}
func createPromFloatSeries(name string, ts time.Time) prompb.TimeSeries {
return prompb.TimeSeries{
Labels: []prompb.Label{
{Name: "__name__", Value: name},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{
Value: 5,
Timestamp: ts.UnixMilli(),
}},
}
}
func createOtelGauge(name string, ts time.Time) pmetric.Metric {
metrics := pmetric.NewMetricSlice()
m := metrics.AppendEmpty()
m.SetName(name)
gauge := m.SetEmptyGauge()
dp := gauge.DataPoints().AppendEmpty()
dp.SetDoubleValue(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("test_label", "test_value")
return m
}
func createOtelExponentialHistogram(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric {
metrics := pmetric.NewMetricSlice()
m := metrics.AppendEmpty()
m.SetName(name)
hist := m.SetEmptyExponentialHistogram()
hist.SetAggregationTemporality(temporality)
dp := hist.DataPoints().AppendEmpty()
dp.SetCount(1)
dp.SetSum(5)
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("test_label", "test_value")
return m
}
func createPromNativeHistogramSeries(name string, hint prompb.Histogram_ResetHint, ts time.Time) prompb.TimeSeries {
return prompb.TimeSeries{
Labels: []prompb.Label{
{Name: "__name__", Value: name},
{Name: "test_label", Value: "test_value"},
},
Histograms: []prompb.Histogram{
{
Count: &prompb.Histogram_CountInt{CountInt: 1},
Sum: 5,
Schema: 0,
ZeroThreshold: 1e-128,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: 0},
Timestamp: ts.UnixMilli(),
ResetHint: hint,
},
},
}
}
func createOtelExplicitHistogram(name string, temporality pmetric.AggregationTemporality, ts time.Time) pmetric.Metric {
metrics := pmetric.NewMetricSlice()
m := metrics.AppendEmpty()
m.SetName(name)
hist := m.SetEmptyHistogram()
hist.SetAggregationTemporality(temporality)
dp := hist.DataPoints().AppendEmpty()
dp.SetCount(20)
dp.SetSum(30)
dp.BucketCounts().FromRaw([]uint64{10, 10, 0})
dp.ExplicitBounds().FromRaw([]float64{1, 2})
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("test_label", "test_value")
return m
}
func createPromNHCBSeries(name string, hint prompb.Histogram_ResetHint, ts time.Time) prompb.TimeSeries {
return prompb.TimeSeries{
Labels: []prompb.Label{
{Name: "__name__", Value: name},
{Name: "test_label", Value: "test_value"},
},
Histograms: []prompb.Histogram{
{
Count: &prompb.Histogram_CountInt{CountInt: 20},
Sum: 30,
Schema: -53,
ZeroThreshold: 0,
ZeroCount: nil,
PositiveSpans: []prompb.BucketSpan{
{
Length: 3,
},
},
PositiveDeltas: []int64{10, 0, -10},
CustomValues: []float64{1, 2},
Timestamp: ts.UnixMilli(),
ResetHint: hint,
},
},
}
}
func createPromClassicHistogramSeries(name string, ts time.Time) []prompb.TimeSeries {
return []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_bucket"},
{Name: "le", Value: "1"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{Value: 10, Timestamp: ts.UnixMilli()}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_bucket"},
{Name: "le", Value: "2"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{Value: 20, Timestamp: ts.UnixMilli()}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_bucket"},
{Name: "le", Value: "+Inf"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{Value: 20, Timestamp: ts.UnixMilli()}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_count"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{Value: 20, Timestamp: ts.UnixMilli()}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_sum"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{Value: 30, Timestamp: ts.UnixMilli()}},
},
}
}
func createOtelSummary(name string, ts time.Time) pmetric.Metric {
metrics := pmetric.NewMetricSlice()
m := metrics.AppendEmpty()
m.SetName(name)
summary := m.SetEmptySummary()
dp := summary.DataPoints().AppendEmpty()
dp.SetCount(9)
dp.SetSum(18)
qv := dp.QuantileValues().AppendEmpty()
qv.SetQuantile(0.5)
qv.SetValue(2)
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.Attributes().PutStr("test_label", "test_value")
return m
}
func createPromSummarySeries(name string, ts time.Time) []prompb.TimeSeries {
return []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_sum"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{
Value: 18,
Timestamp: ts.UnixMilli(),
}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: name + "_count"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{
Value: 9,
Timestamp: ts.UnixMilli(),
}},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: name},
{Name: "quantile", Value: "0.5"},
{Name: "test_label", Value: "test_value"},
},
Samples: []prompb.Sample{{
Value: 2,
Timestamp: ts.UnixMilli(),
}},
},
}
}
func createOtelEmptyType(name string) pmetric.Metric {
metrics := pmetric.NewMetricSlice()
m := metrics.AppendEmpty()
m.SetName(name)
return m
}
func sortTimeSeries(series []prompb.TimeSeries) []prompb.TimeSeries {
for i := range series {
sort.Slice(series[i].Labels, func(j, k int) bool {
return series[i].Labels[j].Name < series[i].Labels[k].Name
})
}
sort.Slice(series, func(i, j int) bool {
return fmt.Sprint(series[i].Labels) < fmt.Sprint(series[j].Labels)
})
return series
}
func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
for _, resourceAttributeCount := range []int{0, 5, 50} {
b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) {

View File

@ -31,12 +31,27 @@ func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) prompb.MetricMeta
if otelMetric.Sum().IsMonotonic() {
metricType = prompb.MetricMetadata_COUNTER
}
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
// We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now.
if otelMetric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityDelta {
metricType = prompb.MetricMetadata_UNKNOWN
}
return metricType
case pmetric.MetricTypeHistogram:
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
// We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now.
if otelMetric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta {
return prompb.MetricMetadata_UNKNOWN
}
return prompb.MetricMetadata_HISTOGRAM
case pmetric.MetricTypeSummary:
return prompb.MetricMetadata_SUMMARY
case pmetric.MetricTypeExponentialHistogram:
if otelMetric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityDelta {
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
// We don't have a proper way to flag delta metrics yet, therefore marking the metric type as unknown for now.
return prompb.MetricMetadata_UNKNOWN
}
return prompb.MetricMetadata_HISTOGRAM
}
return prompb.MetricMetadata_UNKNOWN

View File

@ -526,20 +526,30 @@ func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref stora
type OTLPOptions struct {
// Convert delta samples to their cumulative equivalent by aggregating in-memory
ConvertDelta bool
// Store the raw delta samples as metrics with unknown type (we don't have a proper type for delta yet, therefore
// marking the metric type as unknown for now).
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
NativeDelta bool
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendable storage.Appendable, configFunc func() config.Config, opts OTLPOptions) http.Handler {
if opts.NativeDelta && opts.ConvertDelta {
// This should be validated when iterating through feature flags, so not expected to fail here.
panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time")
}
ex := &rwExporter{
writeHandler: &writeHandler{
logger: logger,
appendable: appendable,
},
config: configFunc,
config: configFunc,
allowDeltaTemporality: opts.NativeDelta,
}
wh := &otlpWriteHandler{logger: logger, cumul: ex}
wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex}
if opts.ConvertDelta {
fac := deltatocumulative.NewFactory()
@ -547,7 +557,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
ID: component.NewID(fac.Type()),
TelemetrySettings: component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()},
}
d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.cumul)
d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.defaultConsumer)
if err != nil {
// fac.CreateMetrics directly calls [deltatocumulativeprocessor.createMetricsProcessor],
// which only errors if:
@ -563,7 +573,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
// deltatocumulative does not error on start. see above for panic reasoning
panic(err)
}
wh.delta = d2c
wh.d2cConsumer = d2c
}
return wh
@ -571,7 +581,8 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
type rwExporter struct {
*writeHandler
config func() config.Config
config func() config.Config
allowDeltaTemporality bool
}
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
@ -584,6 +595,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes,
ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB,
AllowDeltaTemporality: rw.allowDeltaTemporality,
})
if err != nil {
rw.logger.Warn("Error translating OTLP metrics to Prometheus write request", "err", err)
@ -607,8 +619,8 @@ func (rw *rwExporter) Capabilities() consumer.Capabilities {
type otlpWriteHandler struct {
logger *slog.Logger
cumul consumer.Metrics // only cumulative
delta consumer.Metrics // delta capable
defaultConsumer consumer.Metrics // stores deltas as-is
d2cConsumer consumer.Metrics // converts deltas to cumulative
}
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -620,13 +632,15 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
md := req.Metrics()
// if delta conversion enabled AND delta samples exist, use slower delta capable path
if h.delta != nil && hasDelta(md) {
err = h.delta.ConsumeMetrics(r.Context(), md)
// If deltatocumulative conversion enabled AND delta samples exist, use slower conversion path.
// While deltatocumulative can also accept cumulative metrics (and then just forwards them as-is), it currently
// holds a sync.Mutex when entering ConsumeMetrics. This is slow and not necessary when ingesting cumulative metrics.
if h.d2cConsumer != nil && hasDelta(md) {
err = h.d2cConsumer.ConsumeMetrics(r.Context(), md)
} else {
// deltatocumulative currently holds a sync.Mutex when entering ConsumeMetrics.
// This is slow and not necessary when no delta samples exist anyways
err = h.cumul.ConsumeMetrics(r.Context(), md)
// Otherwise use default consumer (alongside cumulative samples, this will accept delta samples and write as-is
// if native-delta-support is enabled).
err = h.defaultConsumer.ConsumeMetrics(r.Context(), md)
}
switch {

View File

@ -262,7 +262,7 @@ func NewAPI(
statsRenderer StatsRenderer,
rwEnabled bool,
acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
otlpEnabled, otlpDeltaToCumulative bool,
otlpEnabled, otlpDeltaToCumulative, otlpNativeDeltaIngestion bool,
ctZeroIngestionEnabled bool,
) *API {
a := &API{
@ -310,7 +310,7 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative})
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ConvertDelta: otlpDeltaToCumulative, NativeDelta: otlpNativeDeltaIngestion})
}
return a

View File

@ -144,6 +144,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
false,
false,
false,
false,
)
promRouter := route.New().WithPrefix("/api/v1")

View File

@ -290,6 +290,7 @@ type Options struct {
EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool
ConvertOTLPDelta bool
NativeOTLPDeltaIngestion bool
IsAgent bool
CTZeroIngestionEnabled bool
AppName string
@ -389,6 +390,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
o.AcceptRemoteWriteProtoMsgs,
o.EnableOTLPWriteReceiver,
o.ConvertOTLPDelta,
o.NativeOTLPDeltaIngestion,
o.CTZeroIngestionEnabled,
)