promql: reuse histogramStatsIterator where possible, and expose it for other implementations to use (#16686)

* Expose type
* Add `Reset` method

---------

Signed-off-by: Charles Korn <charles.korn@grafana.com>
This commit is contained in:
Charles Korn 2025-06-19 07:53:46 +10:00 committed by GitHub
parent 32b471ed47
commit 8d9dfa075d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 103 additions and 48 deletions

View File

@ -3931,6 +3931,12 @@ func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries {
} }
func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
// Try to reuse the iterator if we can.
if statsIterator, ok := it.(*HistogramStatsIterator); ok {
statsIterator.Reset(s.Series.Iterator(statsIterator.Iterator))
return statsIterator
}
return NewHistogramStatsIterator(s.Series.Iterator(it)) return NewHistogramStatsIterator(s.Series.Iterator(it))
} }

View File

@ -19,7 +19,11 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
) )
type histogramStatsIterator struct { // HistogramStatsIterator is an iterator that returns histogram objects
// which have only their sum and count values populated. The iterator handles
// counter reset detection internally and sets the counter reset hint accordingly
// in each returned histogram object.
type HistogramStatsIterator struct {
chunkenc.Iterator chunkenc.Iterator
currentH *histogram.Histogram currentH *histogram.Histogram
@ -27,24 +31,30 @@ type histogramStatsIterator struct {
currentFH *histogram.FloatHistogram currentFH *histogram.FloatHistogram
lastFH *histogram.FloatHistogram lastFH *histogram.FloatHistogram
currentSeriesRead bool
} }
// NewHistogramStatsIterator creates an iterator which returns histogram objects // NewHistogramStatsIterator creates a new HistogramStatsIterator.
// which have only their sum and count values populated. The iterator handles func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator {
// counter reset detection internally and sets the counter reset hint accordingly return &HistogramStatsIterator{
// in each returned histogram objects.
func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator {
return &histogramStatsIterator{
Iterator: it, Iterator: it,
currentH: &histogram.Histogram{}, currentH: &histogram.Histogram{},
currentFH: &histogram.FloatHistogram{}, currentFH: &histogram.FloatHistogram{},
} }
} }
// Reset resets this iterator for use with a new underlying iterator, reusing
// objects already allocated where possible.
func (f *HistogramStatsIterator) Reset(it chunkenc.Iterator) {
f.Iterator = it
f.currentSeriesRead = false
}
// AtHistogram returns the next timestamp/histogram pair. The counter reset // AtHistogram returns the next timestamp/histogram pair. The counter reset
// detection is guaranteed to be correct only when the caller does not switch // detection is guaranteed to be correct only when the caller does not switch
// between AtHistogram and AtFloatHistogram calls. // between AtHistogram and AtFloatHistogram calls.
func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { func (f *HistogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
var t int64 var t int64
t, f.currentH = f.Iterator.AtHistogram(f.currentH) t, f.currentH = f.Iterator.AtHistogram(f.currentH)
if value.IsStaleNaN(f.currentH.Sum) { if value.IsStaleNaN(f.currentH.Sum) {
@ -76,7 +86,7 @@ func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *hi
// AtFloatHistogram returns the next timestamp/float histogram pair. The counter // AtFloatHistogram returns the next timestamp/float histogram pair. The counter
// reset detection is guaranteed to be correct only when the caller does not // reset detection is guaranteed to be correct only when the caller does not
// switch between AtHistogram and AtFloatHistogram calls. // switch between AtHistogram and AtFloatHistogram calls.
func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { func (f *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
var t int64 var t int64
t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH) t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH)
if value.IsStaleNaN(f.currentFH.Sum) { if value.IsStaleNaN(f.currentFH.Sum) {
@ -104,31 +114,35 @@ func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
return t, fh return t, fh
} }
func (f *histogramStatsIterator) setLastH(h *histogram.Histogram) { func (f *HistogramStatsIterator) setLastH(h *histogram.Histogram) {
f.lastFH = nil f.lastFH = nil
if f.lastH == nil { if f.lastH == nil {
f.lastH = h.Copy() f.lastH = h.Copy()
} else { } else {
h.CopyTo(f.lastH) h.CopyTo(f.lastH)
} }
f.currentSeriesRead = true
} }
func (f *histogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { func (f *HistogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) {
f.lastH = nil f.lastH = nil
if f.lastFH == nil { if f.lastFH == nil {
f.lastFH = fh.Copy() f.lastFH = fh.Copy()
} else { } else {
fh.CopyTo(f.lastFH) fh.CopyTo(f.lastFH)
} }
f.currentSeriesRead = true
} }
func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { func (f *HistogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint {
if hint != histogram.UnknownCounterReset { if hint != histogram.UnknownCounterReset {
return hint return hint
} }
prevFH := f.lastFH prevFH := f.lastFH
if prevFH == nil { if prevFH == nil || !f.currentSeriesRead {
if f.lastH == nil { if f.lastH == nil || !f.currentSeriesRead {
// We don't know if there's a counter reset. // We don't know if there's a counter reset.
return histogram.UnknownCounterReset return histogram.UnknownCounterReset
} }
@ -140,13 +154,13 @@ func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHi
return histogram.NotCounterReset return histogram.NotCounterReset
} }
func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { func (f *HistogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint {
if h.CounterResetHint != histogram.UnknownCounterReset { if h.CounterResetHint != histogram.UnknownCounterReset {
return h.CounterResetHint return h.CounterResetHint
} }
var prevFH *histogram.FloatHistogram var prevFH *histogram.FloatHistogram
if f.lastH == nil { if f.lastH == nil || !f.currentSeriesRead {
if f.lastFH == nil { if f.lastFH == nil || !f.currentSeriesRead {
// We don't know if there's a counter reset. // We don't know if there's a counter reset.
return histogram.UnknownCounterReset return histogram.UnknownCounterReset
} }

View File

@ -33,7 +33,7 @@ func TestHistogramStatsDecoding(t *testing.T) {
expectedHints []histogram.CounterResetHint expectedHints []histogram.CounterResetHint
}{ }{
{ {
name: "unknown counter reset triggers detection", name: "unknown counter reset for later sample triggers detection",
histograms: []*histogram.Histogram{ histograms: []*histogram.Histogram{
tsdbutil.GenerateTestHistogramWithHint(0, histogram.NotCounterReset), tsdbutil.GenerateTestHistogramWithHint(0, histogram.NotCounterReset),
tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset), tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
@ -47,6 +47,21 @@ func TestHistogramStatsDecoding(t *testing.T) {
histogram.NotCounterReset, histogram.NotCounterReset,
}, },
}, },
{
name: "unknown counter reset for first sample does not trigger detection",
histograms: []*histogram.Histogram{
tsdbutil.GenerateTestHistogramWithHint(0, histogram.UnknownCounterReset),
tsdbutil.GenerateTestHistogramWithHint(1, histogram.UnknownCounterReset),
tsdbutil.GenerateTestHistogramWithHint(2, histogram.CounterReset),
tsdbutil.GenerateTestHistogramWithHint(2, histogram.UnknownCounterReset),
},
expectedHints: []histogram.CounterResetHint{
histogram.UnknownCounterReset,
histogram.NotCounterReset,
histogram.CounterReset,
histogram.NotCounterReset,
},
},
{ {
name: "stale sample before unknown reset hint", name: "stale sample before unknown reset hint",
histograms: []*histogram.Histogram{ histograms: []*histogram.Histogram{
@ -100,42 +115,62 @@ func TestHistogramStatsDecoding(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Run("histogram_stats", func(t *testing.T) { t.Run("histogram_stats", func(t *testing.T) {
decodedStats := make([]*histogram.Histogram, 0) check := func(statsIterator *HistogramStatsIterator) {
statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) decodedStats := make([]*histogram.Histogram, 0)
for statsIterator.Next() != chunkenc.ValNone {
_, h := statsIterator.AtHistogram(nil) for statsIterator.Next() != chunkenc.ValNone {
decodedStats = append(decodedStats, h) _, h := statsIterator.AtHistogram(nil)
} decodedStats = append(decodedStats, h)
for i := 0; i < len(tc.histograms); i++ { }
require.Equalf(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, "mismatch in counter reset hint for histogram %d", i)
h := tc.histograms[i] for i := 0; i < len(tc.histograms); i++ {
if value.IsStaleNaN(h.Sum) { require.Equalf(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, "mismatch in counter reset hint for histogram %d", i)
require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) h := tc.histograms[i]
require.Equal(t, uint64(0), decodedStats[i].Count) if value.IsStaleNaN(h.Sum) {
} else { require.True(t, value.IsStaleNaN(decodedStats[i].Sum))
require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count) require.Equal(t, uint64(0), decodedStats[i].Count)
require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum) } else {
require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count)
require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum)
}
} }
} }
// Check that we get the expected results with a fresh iterator.
statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator)
// Check that we get the same results if we reset and reuse that iterator.
statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator)
}) })
t.Run("float_histogram_stats", func(t *testing.T) { t.Run("float_histogram_stats", func(t *testing.T) {
decodedStats := make([]*histogram.FloatHistogram, 0) check := func(statsIterator *HistogramStatsIterator) {
statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) decodedStats := make([]*histogram.FloatHistogram, 0)
for statsIterator.Next() != chunkenc.ValNone { for statsIterator.Next() != chunkenc.ValNone {
_, h := statsIterator.AtFloatHistogram(nil) _, h := statsIterator.AtFloatHistogram(nil)
decodedStats = append(decodedStats, h) decodedStats = append(decodedStats, h)
} }
for i := 0; i < len(tc.histograms); i++ { for i := 0; i < len(tc.histograms); i++ {
require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint)
fh := tc.histograms[i].ToFloat(nil) fh := tc.histograms[i].ToFloat(nil)
if value.IsStaleNaN(fh.Sum) { if value.IsStaleNaN(fh.Sum) {
require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) require.True(t, value.IsStaleNaN(decodedStats[i].Sum))
require.Equal(t, float64(0), decodedStats[i].Count) require.Equal(t, float64(0), decodedStats[i].Count)
} else { } else {
require.Equal(t, fh.Count, decodedStats[i].Count) require.Equal(t, fh.Count, decodedStats[i].Count)
require.Equal(t, fh.Sum, decodedStats[i].Sum) require.Equal(t, fh.Sum, decodedStats[i].Sum)
}
} }
} }
// Check that we get the expected results with a fresh iterator.
statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator)
// Check that we get the same results if we reset and reuse that iterator.
statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil))
check(statsIterator)
}) })
}) })
} }