Merge pull request #17127 from prometheus/beorn7/histogram2

Fix and optimize `HistogramStatsIterator` usage
This commit is contained in:
Björn Rabenstein 2025-09-09 15:52:49 +02:00 committed by GitHub
commit fda99c6b35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 182 additions and 196 deletions

View File

@ -3891,20 +3891,34 @@ func detectHistogramStatsDecoding(expr parser.Expr) {
return nil return nil
} }
for i := len(path) - 1; i > 0; i-- { // Walk backwards up the path. pathLoop:
for i := len(path) - 1; i >= 0; i-- { // Walk backwards up the path.
if _, ok := path[i].(*parser.SubqueryExpr); ok {
// If we ever see a subquery in the path, we
// will not skip the buckets. We need the
// buckets for correct counter reset detection.
n.SkipHistogramBuckets = false
break pathLoop
}
call, ok := path[i].(*parser.Call) call, ok := path[i].(*parser.Call)
if !ok { if !ok {
continue continue pathLoop
} }
switch call.Func.Name { switch call.Func.Name {
case "histogram_count", "histogram_sum", "histogram_avg": case "histogram_count", "histogram_sum", "histogram_avg":
// We allow skipping buckets preliminarily. But
// we will continue through the path to see if
// we find a subquery (or a histogram function)
// further up (the latter wouldn't make sense,
// but no harm in detecting it).
n.SkipHistogramBuckets = true n.SkipHistogramBuckets = true
case "histogram_quantile", "histogram_fraction": case "histogram_quantile", "histogram_fraction":
// If we ever see a function that needs the
// whole histogram, we will not skip the
// buckets.
n.SkipHistogramBuckets = false n.SkipHistogramBuckets = false
default: break pathLoop
continue
} }
break
} }
return errors.New("stop") return errors.New("stop")
}) })

View File

@ -19,28 +19,26 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
) )
// HistogramStatsIterator is an iterator that returns histogram objects // HistogramStatsIterator is an iterator that returns histogram objects that
// which have only their sum and count values populated. The iterator handles // have only their sum and count values populated. The iterator handles counter
// counter reset detection internally and sets the counter reset hint accordingly // reset detection internally and sets the counter reset hint accordingly in
// in each returned histogram object. // each returned histogram object. The Next and Seek methods of the iterator
// will never return ValHistogram, but ValFloatHistogram instead. Effectively,
// the iterator enforces conversion of (integer) Histogram to FloatHistogram.
// The AtHistogram method must not be called (and will panic).
type HistogramStatsIterator struct { type HistogramStatsIterator struct {
chunkenc.Iterator chunkenc.Iterator
currentH *histogram.Histogram current *histogram.FloatHistogram
lastH *histogram.Histogram last *histogram.FloatHistogram
lastIsCurrent bool
currentFH *histogram.FloatHistogram
lastFH *histogram.FloatHistogram
currentSeriesRead bool
} }
// NewHistogramStatsIterator creates a new HistogramStatsIterator. // NewHistogramStatsIterator creates a new HistogramStatsIterator.
func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator { func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator {
return &HistogramStatsIterator{ return &HistogramStatsIterator{
Iterator: it, Iterator: it,
currentH: &histogram.Histogram{}, current: &histogram.FloatHistogram{},
currentFH: &histogram.FloatHistogram{},
} }
} }
@ -48,136 +46,117 @@ func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator {
// objects already allocated where possible. // objects already allocated where possible.
func (hsi *HistogramStatsIterator) Reset(it chunkenc.Iterator) { func (hsi *HistogramStatsIterator) Reset(it chunkenc.Iterator) {
hsi.Iterator = it hsi.Iterator = it
hsi.currentSeriesRead = false hsi.last = nil
hsi.lastIsCurrent = false
} }
// AtHistogram returns the next timestamp/histogram pair. The counter reset // Next mostly relays to the underlying iterator, but changes a ValHistogram
// detection is guaranteed to be correct only when the caller does not switch // return into a ValFloatHistogram return.
// between AtHistogram and AtFloatHistogram calls. func (hsi *HistogramStatsIterator) Next() chunkenc.ValueType {
func (hsi *HistogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { hsi.lastIsCurrent = false
var t int64 vt := hsi.Iterator.Next()
t, hsi.currentH = hsi.Iterator.AtHistogram(hsi.currentH) if vt == chunkenc.ValHistogram {
if value.IsStaleNaN(hsi.currentH.Sum) { return chunkenc.ValFloatHistogram
h = &histogram.Histogram{Sum: hsi.currentH.Sum} }
return t, h return vt
} }
if h == nil { // Seek mostly relays to the underlying iterator, but changes a ValHistogram
h = &histogram.Histogram{ // return into a ValFloatHistogram return.
CounterResetHint: hsi.getResetHint(hsi.currentH), func (hsi *HistogramStatsIterator) Seek(t int64) chunkenc.ValueType {
Count: hsi.currentH.Count, // If the Seek is going to move the iterator, we have to forget the
Sum: hsi.currentH.Sum, // lastFH and mark the currentFH as not current anymore.
if t > hsi.AtT() {
hsi.last = nil
hsi.lastIsCurrent = false
} }
hsi.setLastH(hsi.currentH) vt := hsi.Iterator.Seek(t)
return t, h if vt == chunkenc.ValHistogram {
return chunkenc.ValFloatHistogram
}
return vt
} }
returnValue := histogram.Histogram{ // AtHistogram must never be called.
CounterResetHint: hsi.getResetHint(hsi.currentH), func (*HistogramStatsIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
Count: hsi.currentH.Count, panic("HistogramStatsIterator.AtHistogram must never be called")
Sum: hsi.currentH.Sum,
}
returnValue.CopyTo(h)
hsi.setLastH(hsi.currentH)
return t, h
} }
// AtFloatHistogram returns the next timestamp/float histogram pair. The counter // AtFloatHistogram returns the next timestamp/float histogram pair. The method
// reset detection is guaranteed to be correct only when the caller does not // performs a counter reset detection on the fly. It will return an explicit
// switch between AtHistogram and AtFloatHistogram calls. // hint (not UnknownCounterReset) if the previous sample has been accessed with
// the same iterator.
func (hsi *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { func (hsi *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
var t int64 populateFH := func(src *histogram.FloatHistogram, detectReset bool) {
t, hsi.currentFH = hsi.Iterator.AtFloatHistogram(hsi.currentFH) h := histogram.FloatHistogram{
if value.IsStaleNaN(hsi.currentFH.Sum) { CounterResetHint: src.CounterResetHint,
return t, &histogram.FloatHistogram{Sum: hsi.currentFH.Sum} Count: src.Count,
Sum: src.Sum,
}
if detectReset {
h.CounterResetHint = hsi.getResetHint(src.CounterResetHint)
} }
if fh == nil { if fh == nil {
fh = &histogram.FloatHistogram{ // Note that we cannot simply write `fh = &h` here
CounterResetHint: hsi.getFloatResetHint(hsi.currentFH.CounterResetHint), // because that would let h escape to the heap.
Count: hsi.currentFH.Count, fh = &histogram.FloatHistogram{}
Sum: hsi.currentFH.Sum, *fh = h
} else {
h.CopyTo(fh)
} }
hsi.setLastFH(hsi.currentFH) }
if hsi.lastIsCurrent {
// Nothing changed since last AtFloatHistogram call. Return a
// copy of the stored last histogram rather than doing counter
// reset detection again (which would yield a potentially wrong
// result of "no counter reset").
populateFH(hsi.last, false)
return hsi.AtT(), fh
}
var t int64
t, hsi.current = hsi.Iterator.AtFloatHistogram(hsi.current)
if value.IsStaleNaN(hsi.current.Sum) {
populateFH(hsi.current, false)
return t, fh
}
populateFH(hsi.current, true)
hsi.setLastFromCurrent(fh.CounterResetHint)
return t, fh return t, fh
} }
returnValue := histogram.FloatHistogram{ // setLastFromCurrent stores a copy of hsi.current as hsi.last. The
CounterResetHint: hsi.getFloatResetHint(hsi.currentFH.CounterResetHint), // CounterResetHint of hsi.last is set to the provided value, though. This is
Count: hsi.currentFH.Count, // meant to store the value we have calculated on the fly so that we can return
Sum: hsi.currentFH.Sum, // the same without re-calculation in case AtFloatHistogram is called multiple
} // times.
returnValue.CopyTo(fh) func (hsi *HistogramStatsIterator) setLastFromCurrent(hint histogram.CounterResetHint) {
if hsi.last == nil {
hsi.setLastFH(hsi.currentFH) hsi.last = hsi.current.Copy()
return t, fh
}
func (hsi *HistogramStatsIterator) setLastH(h *histogram.Histogram) {
hsi.lastFH = nil
if hsi.lastH == nil {
hsi.lastH = h.Copy()
} else { } else {
h.CopyTo(hsi.lastH) hsi.current.CopyTo(hsi.last)
}
hsi.last.CounterResetHint = hint
hsi.lastIsCurrent = true
} }
hsi.currentSeriesRead = true func (hsi *HistogramStatsIterator) getResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint {
}
func (hsi *HistogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) {
hsi.lastH = nil
if hsi.lastFH == nil {
hsi.lastFH = fh.Copy()
} else {
fh.CopyTo(hsi.lastFH)
}
hsi.currentSeriesRead = true
}
func (hsi *HistogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint {
if hint != histogram.UnknownCounterReset { if hint != histogram.UnknownCounterReset {
return hint return hint
} }
prevFH := hsi.lastFH if hsi.last == nil {
if prevFH == nil || !hsi.currentSeriesRead { // We don't know if there's a counter reset. Note that this
if hsi.lastH == nil || !hsi.currentSeriesRead { // generally will trigger an explicit counter reset detection by
// We don't know if there's a counter reset. // the PromQL engine, which in turn isn't as reliable in this
// case because the PromQL engine will not see the buckets.
// However, we can assume that in cases where the counter reset
// detection is relevant, an iteration through the series has
// happened, and therefore we do not end up here in the first
// place.
return histogram.UnknownCounterReset return histogram.UnknownCounterReset
} }
prevFH = hsi.lastH.ToFloat(nil) if hsi.current.DetectReset(hsi.last) {
}
if hsi.currentFH.DetectReset(prevFH) {
return histogram.CounterReset
}
return histogram.NotCounterReset
}
func (hsi *HistogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint {
if h.CounterResetHint != histogram.UnknownCounterReset {
return h.CounterResetHint
}
var prevFH *histogram.FloatHistogram
if hsi.lastH == nil || !hsi.currentSeriesRead {
if hsi.lastFH == nil || !hsi.currentSeriesRead {
// We don't know if there's a counter reset. Note that
// this generally will trigger an explicit counter reset
// detection by the PromQL engine, which in turn isn't
// as reliable in this case because the PromQL engine
// will not see the buckets. However, we can assume that
// in cases where the counter reset detection is
// relevant, an iteration through the series has
// happened, and therefore we do not end up here in the
// first place.
return histogram.UnknownCounterReset
}
prevFH = hsi.lastFH
} else {
prevFH = hsi.lastH.ToFloat(nil)
}
fh := h.ToFloat(nil)
if fh.DetectReset(prevFH) {
return histogram.CounterReset return histogram.CounterReset
} }
return histogram.NotCounterReset return histogram.NotCounterReset

View File

@ -114,43 +114,18 @@ func TestHistogramStatsDecoding(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Run("histogram_stats", func(t *testing.T) {
check := func(statsIterator *HistogramStatsIterator) {
decodedStats := make([]*histogram.Histogram, 0)
for statsIterator.Next() != chunkenc.ValNone {
_, h := statsIterator.AtHistogram(nil)
decodedStats = append(decodedStats, h)
}
for i := 0; i < len(tc.histograms); i++ {
require.Equalf(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, "mismatch in counter reset hint for histogram %d", i)
h := tc.histograms[i]
if value.IsStaleNaN(h.Sum) {
require.True(t, value.IsStaleNaN(decodedStats[i].Sum))
require.Equal(t, uint64(0), decodedStats[i].Count)
} else {
require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count)
require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum)
}
}
}
// Check that we get the expected results with a fresh iterator.
statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator)
// Check that we get the same results if we reset and reuse that iterator.
statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator)
})
t.Run("float_histogram_stats", func(t *testing.T) {
check := func(statsIterator *HistogramStatsIterator) { check := func(statsIterator *HistogramStatsIterator) {
decodedStats := make([]*histogram.FloatHistogram, 0) decodedStats := make([]*histogram.FloatHistogram, 0)
for statsIterator.Next() != chunkenc.ValNone { for typ := statsIterator.Next(); typ != chunkenc.ValNone; typ = statsIterator.Next() {
_, h := statsIterator.AtFloatHistogram(nil) require.Equal(t, chunkenc.ValFloatHistogram, typ)
decodedStats = append(decodedStats, h) t1, h1 := statsIterator.AtFloatHistogram(nil)
// Call AtFloatHistogram again to check for idempotency.
t2, h2 := statsIterator.AtFloatHistogram(nil)
require.Equal(t, t1, t2)
require.True(t, h1.Equals(h2)) // require.Equal does not work with sum=NaN.
decodedStats = append(decodedStats, h1)
} }
require.NoError(t, statsIterator.Err())
for i := 0; i < len(tc.histograms); i++ { for i := 0; i < len(tc.histograms); i++ {
require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint)
fh := tc.histograms[i].ToFloat(nil) fh := tc.histograms[i].ToFloat(nil)
@ -172,7 +147,6 @@ func TestHistogramStatsDecoding(t *testing.T) {
statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil)) statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator) check(statsIterator)
}) })
})
} }
} }
@ -193,17 +167,21 @@ func TestHistogramStatsMixedUse(t *testing.T) {
histogram.NotCounterReset, histogram.NotCounterReset,
histogram.CounterReset, histogram.CounterReset,
} }
// Note that statsIterator always returns float histograms.
actualHints := make([]histogram.CounterResetHint, 3) actualHints := make([]histogram.CounterResetHint, 3)
typ := statsIterator.Next() typ := statsIterator.Next()
require.Equal(t, chunkenc.ValHistogram, typ) require.Equal(t, chunkenc.ValFloatHistogram, typ)
_, h := statsIterator.AtHistogram(nil) _, h := statsIterator.AtFloatHistogram(nil)
actualHints[0] = h.CounterResetHint actualHints[0] = h.CounterResetHint
typ = statsIterator.Next() typ = statsIterator.Next()
require.Equal(t, chunkenc.ValHistogram, typ) require.Equal(t, chunkenc.ValFloatHistogram, typ)
_, h = statsIterator.AtHistogram(nil) _, h = statsIterator.AtFloatHistogram(nil)
// We call AtFloatHistogram here again "randomly" to check idempotency.
_, h2 := statsIterator.AtFloatHistogram(nil)
require.True(t, h.Equals(h2))
actualHints[1] = h.CounterResetHint actualHints[1] = h.CounterResetHint
typ = statsIterator.Next() typ = statsIterator.Next()
require.Equal(t, chunkenc.ValHistogram, typ) require.Equal(t, chunkenc.ValFloatHistogram, typ)
_, fh := statsIterator.AtFloatHistogram(nil) _, fh := statsIterator.AtFloatHistogram(nil)
actualHints[2] = fh.CounterResetHint actualHints[2] = fh.CounterResetHint

View File

@ -1608,6 +1608,24 @@ eval instant at 1m histogram_quantile(0.5, myHistogram2)
eval instant at 1m histogram_quantile(0.5, mixedHistogram) eval instant at 1m histogram_quantile(0.5, mixedHistogram)
expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram" expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram"
clear
# A counter reset only in a bucket. Sub-queries still need to detect
# it via explicit counter reset detection. This test also runs it with
# histogram_count in the expression to make sure that the
# HistogramStatsIterator is not used. (The latter fails to correctly
# do the counter resets because Seek is used with sub-queries. And the
# explicit counter reset detection done with sub-queries cannot access
# the buckets anymore, if HistogramStatsIterator is used.)
load 1m
h{} {{schema:0 count:1 sum:10 buckets:[1]}}+{{}}x20 {{schema:0 count:1 sum:10 buckets:[0 1]}}+{{}}x20
# Both evals below should yield the same value for the count.
eval instant at 41m histogram_count(increase(h[40m:9m]))
{} 1.4814814814814814
eval instant at 41m increase(h[40m:9m])
{} {{count:1.4814814814814814 sum:14.814814814814813 counter_reset_hint:gauge offset:1 buckets:[1.4814814814814814]}}
clear clear
@ -1617,31 +1635,28 @@ load 1m
# Trigger an annotation about conflicting counter resets by going through the # Trigger an annotation about conflicting counter resets by going through the
# HistogramStatsIterator, which creates counter reset hints on the fly. # HistogramStatsIterator, which creates counter reset hints on the fly.
eval instant at 5m 1*histogram_count(sum_over_time(reset{timing="late"}[5m])) eval instant at 5m histogram_count(sum_over_time(reset{timing="late"}[5m]))
expect warn msg: PromQL warning: conflicting counter resets during histogram addition expect warn msg: PromQL warning: conflicting counter resets during histogram addition
{timing="late"} 7 {timing="late"} 7
eval instant at 5m 1*histogram_count(sum(reset)) eval instant at 5m histogram_count(sum(reset))
expect warn msg: PromQL warning: conflicting counter resets during histogram aggregation expect warn msg: PromQL warning: conflicting counter resets during histogram aggregation
{} 5 {} 5
eval instant at 5m 1*histogram_count(avg(reset)) eval instant at 5m histogram_count(avg(reset))
expect warn msg: PromQL warning: conflicting counter resets during histogram aggregation expect warn msg: PromQL warning: conflicting counter resets during histogram aggregation
{} 2.5 {} 2.5
# No annotation with the right timing. # No annotation with the right timing.
eval instant at 30s 1*histogram_count(sum(reset)) eval instant at 30s histogram_count(sum(reset))
expect no_warn expect no_warn
{} 3 {} 3
eval instant at 30s 1*histogram_count(avg(reset)) eval instant at 30s histogram_count(avg(reset))
expect no_warn expect no_warn
{} 1.5 {} 1.5
# Ensure that the annotation does not happen with rate. # Ensure that the annotation does not happen with rate.
eval instant at 5m 1*histogram_count(rate(reset{timing="late"}[5m])) eval instant at 5m histogram_count(rate(reset{timing="late"}[5m]))
expect no_warn expect no_warn
{timing="late"} 0.0175 {timing="late"} 0.0175
# NOTE: The `1*` part in the expressions above should not be needed.
# It can be removed once https://github.com/prometheus/prometheus/pull/17127 is merged.