promql: Simplify avg aggregation and avg_over_time

As it turns out, if we combine Kahan summation and incremental mean
calculation properly, it works quite well and we do not need to switch
between simple mean calculation and incremental calculation based on
overflow.

This simplifies the code quite a bit.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2025-05-07 15:59:48 +02:00
parent 41594cebb4
commit ff67596a82
2 changed files with 40 additions and 87 deletions

View File

@ -2981,7 +2981,6 @@ type groupedAggregation struct {
hasHistogram bool // Has at least 1 histogram sample aggregated.
incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets, or incompatible custom buckets.
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group.
incrementalMean bool // True after reverting to incremental calculation of the mean value.
}
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
@ -3080,6 +3079,21 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
}
case parser.AVG:
// For the average calculation, we use incremental mean
// calculation. In particular in combination with Kahan
// summation (which we do for floats, but not yet for
// histograms, see issue #14105), this is quite accurate
// and only breaks in extreme cases (see testdata for
// avg_over_time). One might assume that simple direct
// mean calculation works better in some cases, but so
// far, our conclusion is that we fare best with the
// incremental approach plus Kahan summation (for
// floats). For a relevant discussion, see
// https://stackoverflow.com/questions/61665473/is-it-beneficial-for-precision-to-calculate-the-incremental-mean-average
// Additional note: For even better numerical accuracy,
// we would need to process the values in a particular
// order, but that would be very hard to implement given
// how the PromQL engine works.
group.groupCount++
if h != nil {
group.hasHistogram = true
@ -3104,45 +3118,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// point in copying the histogram in that case.
} else {
group.hasFloat = true
if !group.incrementalMean {
newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC)
if !math.IsInf(newV, 0) {
// The sum doesn't overflow, so we propagate it to the
// group struct and continue with the regular
// calculation of the mean value.
group.floatValue, group.floatKahanC = newV, newC
break
}
// If we are here, we know that the sum _would_ overflow. So
// instead of continue to sum up, we revert to incremental
// calculation of the mean value from here on.
group.incrementalMean = true
group.floatMean = group.floatValue / (group.groupCount - 1)
group.floatKahanC /= group.groupCount - 1
}
if math.IsInf(group.floatMean, 0) {
if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct
// already.
break
}
if !math.IsInf(f, 0) && !math.IsNaN(f) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
break
}
}
currentMean := group.floatMean + group.floatKahanC
q := (group.groupCount - 1) / group.groupCount
group.floatMean, group.floatKahanC = kahanSumInc(
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
f/group.groupCount-currentMean/group.groupCount,
group.floatMean,
group.floatKahanC,
f/group.groupCount,
q*group.floatMean,
q*group.floatKahanC,
)
}
@ -3215,10 +3195,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
continue
case aggr.hasHistogram:
aggr.histogramValue = aggr.histogramValue.Compact(0)
case aggr.incrementalMean:
aggr.floatValue = aggr.floatMean + aggr.floatKahanC
default:
aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount
aggr.floatValue = aggr.floatMean + aggr.floatKahanC
}
case parser.COUNT:

View File

@ -671,6 +671,20 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
metricName := firstSeries.Metric.Get(labels.MetricName)
return enh.Out, annotations.New().Add(annotations.NewMixedFloatsHistogramsWarning(metricName, args[0].PositionRange()))
}
// For the average calculation, we use incremental mean calculation. In
// particular in combination with Kahan summation (which we do for
// floats, but not yet for histograms, see issue #14105), this is quite
// accurate and only breaks in extreme cases (see testdata). One might
// assume that simple direct mean calculation works better in some
// cases, but so far, our conclusion is that we fare best with the
// incremental approach plus Kahan summation (for floats). For a
// relevant discussion, see
// https://stackoverflow.com/questions/61665473/is-it-beneficial-for-precision-to-calculate-the-incremental-mean-average
// Additional note: For even better numerical accuracy, we would need to
// process the values in a particular order. For avg_over_time, that
// would be more or less feasible, but it would be more expensivo, and
// it would also be much harder for the avg aggregator, given how the
// PromQL engine works.
if len(firstSeries.Floats) == 0 {
// The passed values only contain histograms.
vec, err := aggrHistOverTime(vals, enh, func(s Series) (*histogram.FloatHistogram, error) {
@ -702,52 +716,13 @@ func funcAvgOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
return vec, nil
}
return aggrOverTime(vals, enh, func(s Series) float64 {
var (
sum, mean, count, kahanC float64
incrementalMean bool
)
for _, f := range s.Floats {
count++
if !incrementalMean {
newSum, newC := kahanSumInc(f.F, sum, kahanC)
// Perform regular mean calculation as long as
// the sum doesn't overflow and (in any case)
// for the first iteration (even if we start
// with ±Inf) to not run into division-by-zero
// problems below.
if count == 1 || !math.IsInf(newSum, 0) {
sum, kahanC = newSum, newC
continue
}
// Handle overflow by reverting to incremental calculation of the mean value.
incrementalMean = true
mean = sum / (count - 1)
kahanC /= count - 1
}
if math.IsInf(mean, 0) {
if math.IsInf(f.F, 0) && (mean > 0) == (f.F > 0) {
// The `mean` and `f.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `mean` is correct
// already.
continue
}
if !math.IsInf(f.F, 0) && !math.IsNaN(f.F) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
continue
}
}
correctedMean := mean + kahanC
mean, kahanC = kahanSumInc(f.F/count-correctedMean/count, mean, kahanC)
var mean, kahanC float64
for i, f := range s.Floats {
count := float64(i + 1)
q := float64(i) / count
mean, kahanC = kahanSumInc(f.F/count, q*mean, q*kahanC)
}
if incrementalMean {
return mean + kahanC
}
return (sum + kahanC) / count
return mean + kahanC
}), nil
}