diff --git a/promql/engine.go b/promql/engine.go index 9024df83da..91257eae37 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3891,20 +3891,34 @@ func detectHistogramStatsDecoding(expr parser.Expr) { return nil } - for i := len(path) - 1; i > 0; i-- { // Walk backwards up the path. + pathLoop: + for i := len(path) - 1; i >= 0; i-- { // Walk backwards up the path. + if _, ok := path[i].(*parser.SubqueryExpr); ok { + // If we ever see a subquery in the path, we + // will not skip the buckets. We need the + // buckets for correct counter reset detection. + n.SkipHistogramBuckets = false + break pathLoop + } call, ok := path[i].(*parser.Call) if !ok { - continue + continue pathLoop } switch call.Func.Name { case "histogram_count", "histogram_sum", "histogram_avg": + // We allow skipping buckets preliminarily. But + // we will continue through the path to see if + // we find a subquery (or a histogram function) + // further up (the latter wouldn't make sense, + // but no harm in detecting it). n.SkipHistogramBuckets = true case "histogram_quantile", "histogram_fraction": + // If we ever see a function that needs the + // whole histogram, we will not skip the + // buckets. n.SkipHistogramBuckets = false - default: - continue + break pathLoop } - break } return errors.New("stop") }) diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go index f5224825d3..e58cc7d848 100644 --- a/promql/histogram_stats_iterator.go +++ b/promql/histogram_stats_iterator.go @@ -19,28 +19,26 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) -// HistogramStatsIterator is an iterator that returns histogram objects -// which have only their sum and count values populated. The iterator handles -// counter reset detection internally and sets the counter reset hint accordingly -// in each returned histogram object. +// HistogramStatsIterator is an iterator that returns histogram objects that +// have only their sum and count values populated. The iterator handles counter +// reset detection internally and sets the counter reset hint accordingly in +// each returned histogram object. The Next and Seek methods of the iterator +// will never return ValHistogram, but ValFloatHistogram instead. Effectively, +// the iterator enforces conversion of (integer) Histogram to FloatHistogram. +// The AtHistogram method must not be called (and will panic). type HistogramStatsIterator struct { chunkenc.Iterator - currentH *histogram.Histogram - lastH *histogram.Histogram - - currentFH *histogram.FloatHistogram - lastFH *histogram.FloatHistogram - - currentSeriesRead bool + current *histogram.FloatHistogram + last *histogram.FloatHistogram + lastIsCurrent bool } // NewHistogramStatsIterator creates a new HistogramStatsIterator. func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator { return &HistogramStatsIterator{ - Iterator: it, - currentH: &histogram.Histogram{}, - currentFH: &histogram.FloatHistogram{}, + Iterator: it, + current: &histogram.FloatHistogram{}, } } @@ -48,136 +46,117 @@ func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator { // objects already allocated where possible. func (hsi *HistogramStatsIterator) Reset(it chunkenc.Iterator) { hsi.Iterator = it - hsi.currentSeriesRead = false + hsi.last = nil + hsi.lastIsCurrent = false } -// AtHistogram returns the next timestamp/histogram pair. The counter reset -// detection is guaranteed to be correct only when the caller does not switch -// between AtHistogram and AtFloatHistogram calls. -func (hsi *HistogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { - var t int64 - t, hsi.currentH = hsi.Iterator.AtHistogram(hsi.currentH) - if value.IsStaleNaN(hsi.currentH.Sum) { - h = &histogram.Histogram{Sum: hsi.currentH.Sum} - return t, h +// Next mostly relays to the underlying iterator, but changes a ValHistogram +// return into a ValFloatHistogram return. +func (hsi *HistogramStatsIterator) Next() chunkenc.ValueType { + hsi.lastIsCurrent = false + vt := hsi.Iterator.Next() + if vt == chunkenc.ValHistogram { + return chunkenc.ValFloatHistogram } - - if h == nil { - h = &histogram.Histogram{ - CounterResetHint: hsi.getResetHint(hsi.currentH), - Count: hsi.currentH.Count, - Sum: hsi.currentH.Sum, - } - hsi.setLastH(hsi.currentH) - return t, h - } - - returnValue := histogram.Histogram{ - CounterResetHint: hsi.getResetHint(hsi.currentH), - Count: hsi.currentH.Count, - Sum: hsi.currentH.Sum, - } - returnValue.CopyTo(h) - - hsi.setLastH(hsi.currentH) - return t, h + return vt } -// AtFloatHistogram returns the next timestamp/float histogram pair. The counter -// reset detection is guaranteed to be correct only when the caller does not -// switch between AtHistogram and AtFloatHistogram calls. +// Seek mostly relays to the underlying iterator, but changes a ValHistogram +// return into a ValFloatHistogram return. +func (hsi *HistogramStatsIterator) Seek(t int64) chunkenc.ValueType { + // If the Seek is going to move the iterator, we have to forget the + // lastFH and mark the currentFH as not current anymore. + if t > hsi.AtT() { + hsi.last = nil + hsi.lastIsCurrent = false + } + vt := hsi.Iterator.Seek(t) + if vt == chunkenc.ValHistogram { + return chunkenc.ValFloatHistogram + } + return vt +} + +// AtHistogram must never be called. +func (*HistogramStatsIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + panic("HistogramStatsIterator.AtHistogram must never be called") +} + +// AtFloatHistogram returns the next timestamp/float histogram pair. The method +// performs a counter reset detection on the fly. It will return an explicit +// hint (not UnknownCounterReset) if the previous sample has been accessed with +// the same iterator. func (hsi *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - var t int64 - t, hsi.currentFH = hsi.Iterator.AtFloatHistogram(hsi.currentFH) - if value.IsStaleNaN(hsi.currentFH.Sum) { - return t, &histogram.FloatHistogram{Sum: hsi.currentFH.Sum} + populateFH := func(src *histogram.FloatHistogram, detectReset bool) { + h := histogram.FloatHistogram{ + CounterResetHint: src.CounterResetHint, + Count: src.Count, + Sum: src.Sum, + } + if detectReset { + h.CounterResetHint = hsi.getResetHint(src.CounterResetHint) + } + if fh == nil { + // Note that we cannot simply write `fh = &h` here + // because that would let h escape to the heap. + fh = &histogram.FloatHistogram{} + *fh = h + } else { + h.CopyTo(fh) + } } - if fh == nil { - fh = &histogram.FloatHistogram{ - CounterResetHint: hsi.getFloatResetHint(hsi.currentFH.CounterResetHint), - Count: hsi.currentFH.Count, - Sum: hsi.currentFH.Sum, - } - hsi.setLastFH(hsi.currentFH) + if hsi.lastIsCurrent { + // Nothing changed since last AtFloatHistogram call. Return a + // copy of the stored last histogram rather than doing counter + // reset detection again (which would yield a potentially wrong + // result of "no counter reset"). + populateFH(hsi.last, false) + return hsi.AtT(), fh + } + + var t int64 + t, hsi.current = hsi.Iterator.AtFloatHistogram(hsi.current) + if value.IsStaleNaN(hsi.current.Sum) { + populateFH(hsi.current, false) return t, fh } - - returnValue := histogram.FloatHistogram{ - CounterResetHint: hsi.getFloatResetHint(hsi.currentFH.CounterResetHint), - Count: hsi.currentFH.Count, - Sum: hsi.currentFH.Sum, - } - returnValue.CopyTo(fh) - - hsi.setLastFH(hsi.currentFH) + populateFH(hsi.current, true) + hsi.setLastFromCurrent(fh.CounterResetHint) return t, fh } -func (hsi *HistogramStatsIterator) setLastH(h *histogram.Histogram) { - hsi.lastFH = nil - if hsi.lastH == nil { - hsi.lastH = h.Copy() +// setLastFromCurrent stores a copy of hsi.current as hsi.last. The +// CounterResetHint of hsi.last is set to the provided value, though. This is +// meant to store the value we have calculated on the fly so that we can return +// the same without re-calculation in case AtFloatHistogram is called multiple +// times. +func (hsi *HistogramStatsIterator) setLastFromCurrent(hint histogram.CounterResetHint) { + if hsi.last == nil { + hsi.last = hsi.current.Copy() } else { - h.CopyTo(hsi.lastH) + hsi.current.CopyTo(hsi.last) } - - hsi.currentSeriesRead = true + hsi.last.CounterResetHint = hint + hsi.lastIsCurrent = true } -func (hsi *HistogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { - hsi.lastH = nil - if hsi.lastFH == nil { - hsi.lastFH = fh.Copy() - } else { - fh.CopyTo(hsi.lastFH) - } - - hsi.currentSeriesRead = true -} - -func (hsi *HistogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { +func (hsi *HistogramStatsIterator) getResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { if hint != histogram.UnknownCounterReset { return hint } - prevFH := hsi.lastFH - if prevFH == nil || !hsi.currentSeriesRead { - if hsi.lastH == nil || !hsi.currentSeriesRead { - // We don't know if there's a counter reset. - return histogram.UnknownCounterReset - } - prevFH = hsi.lastH.ToFloat(nil) + if hsi.last == nil { + // We don't know if there's a counter reset. Note that this + // generally will trigger an explicit counter reset detection by + // the PromQL engine, which in turn isn't as reliable in this + // case because the PromQL engine will not see the buckets. + // However, we can assume that in cases where the counter reset + // detection is relevant, an iteration through the series has + // happened, and therefore we do not end up here in the first + // place. + return histogram.UnknownCounterReset } - if hsi.currentFH.DetectReset(prevFH) { - return histogram.CounterReset - } - return histogram.NotCounterReset -} - -func (hsi *HistogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { - if h.CounterResetHint != histogram.UnknownCounterReset { - return h.CounterResetHint - } - var prevFH *histogram.FloatHistogram - if hsi.lastH == nil || !hsi.currentSeriesRead { - if hsi.lastFH == nil || !hsi.currentSeriesRead { - // We don't know if there's a counter reset. Note that - // this generally will trigger an explicit counter reset - // detection by the PromQL engine, which in turn isn't - // as reliable in this case because the PromQL engine - // will not see the buckets. However, we can assume that - // in cases where the counter reset detection is - // relevant, an iteration through the series has - // happened, and therefore we do not end up here in the - // first place. - return histogram.UnknownCounterReset - } - prevFH = hsi.lastFH - } else { - prevFH = hsi.lastH.ToFloat(nil) - } - fh := h.ToFloat(nil) - if fh.DetectReset(prevFH) { + if hsi.current.DetectReset(hsi.last) { return histogram.CounterReset } return histogram.NotCounterReset diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go index 3e3f2dd4b2..80bfee519d 100644 --- a/promql/histogram_stats_iterator_test.go +++ b/promql/histogram_stats_iterator_test.go @@ -114,64 +114,38 @@ func TestHistogramStatsDecoding(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - t.Run("histogram_stats", func(t *testing.T) { - check := func(statsIterator *HistogramStatsIterator) { - decodedStats := make([]*histogram.Histogram, 0) - - for statsIterator.Next() != chunkenc.ValNone { - _, h := statsIterator.AtHistogram(nil) - decodedStats = append(decodedStats, h) - } - - for i := 0; i < len(tc.histograms); i++ { - require.Equalf(t, tc.expectedHints[i], decodedStats[i].CounterResetHint, "mismatch in counter reset hint for histogram %d", i) - h := tc.histograms[i] - if value.IsStaleNaN(h.Sum) { - require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) - require.Equal(t, uint64(0), decodedStats[i].Count) - } else { - require.Equal(t, tc.histograms[i].Count, decodedStats[i].Count) - require.Equal(t, tc.histograms[i].Sum, decodedStats[i].Sum) - } + check := func(statsIterator *HistogramStatsIterator) { + decodedStats := make([]*histogram.FloatHistogram, 0) + for typ := statsIterator.Next(); typ != chunkenc.ValNone; typ = statsIterator.Next() { + require.Equal(t, chunkenc.ValFloatHistogram, typ) + t1, h1 := statsIterator.AtFloatHistogram(nil) + // Call AtFloatHistogram again to check for idempotency. + t2, h2 := statsIterator.AtFloatHistogram(nil) + require.Equal(t, t1, t2) + require.True(t, h1.Equals(h2)) // require.Equal does not work with sum=NaN. + decodedStats = append(decodedStats, h1) + } + require.NoError(t, statsIterator.Err()) + for i := 0; i < len(tc.histograms); i++ { + require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) + fh := tc.histograms[i].ToFloat(nil) + if value.IsStaleNaN(fh.Sum) { + require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) + require.Equal(t, float64(0), decodedStats[i].Count) + } else { + require.Equal(t, fh.Count, decodedStats[i].Count) + require.Equal(t, fh.Sum, decodedStats[i].Sum) } } + } - // Check that we get the expected results with a fresh iterator. - statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) - check(statsIterator) + // Check that we get the expected results with a fresh iterator. + statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) + check(statsIterator) - // Check that we get the same results if we reset and reuse that iterator. - statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil)) - check(statsIterator) - }) - t.Run("float_histogram_stats", func(t *testing.T) { - check := func(statsIterator *HistogramStatsIterator) { - decodedStats := make([]*histogram.FloatHistogram, 0) - for statsIterator.Next() != chunkenc.ValNone { - _, h := statsIterator.AtFloatHistogram(nil) - decodedStats = append(decodedStats, h) - } - for i := 0; i < len(tc.histograms); i++ { - require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) - fh := tc.histograms[i].ToFloat(nil) - if value.IsStaleNaN(fh.Sum) { - require.True(t, value.IsStaleNaN(decodedStats[i].Sum)) - require.Equal(t, float64(0), decodedStats[i].Count) - } else { - require.Equal(t, fh.Count, decodedStats[i].Count) - require.Equal(t, fh.Sum, decodedStats[i].Sum) - } - } - } - - // Check that we get the expected results with a fresh iterator. - statsIterator := NewHistogramStatsIterator(newHistogramSeries(tc.histograms).Iterator(nil)) - check(statsIterator) - - // Check that we get the same results if we reset and reuse that iterator. - statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil)) - check(statsIterator) - }) + // Check that we get the same results if we reset and reuse that iterator. + statsIterator.Reset(newHistogramSeries(tc.histograms).Iterator(nil)) + check(statsIterator) }) } } @@ -193,17 +167,21 @@ func TestHistogramStatsMixedUse(t *testing.T) { histogram.NotCounterReset, histogram.CounterReset, } + // Note that statsIterator always returns float histograms. actualHints := make([]histogram.CounterResetHint, 3) typ := statsIterator.Next() - require.Equal(t, chunkenc.ValHistogram, typ) - _, h := statsIterator.AtHistogram(nil) + require.Equal(t, chunkenc.ValFloatHistogram, typ) + _, h := statsIterator.AtFloatHistogram(nil) actualHints[0] = h.CounterResetHint typ = statsIterator.Next() - require.Equal(t, chunkenc.ValHistogram, typ) - _, h = statsIterator.AtHistogram(nil) + require.Equal(t, chunkenc.ValFloatHistogram, typ) + _, h = statsIterator.AtFloatHistogram(nil) + // We call AtFloatHistogram here again "randomly" to check idempotency. + _, h2 := statsIterator.AtFloatHistogram(nil) + require.True(t, h.Equals(h2)) actualHints[1] = h.CounterResetHint typ = statsIterator.Next() - require.Equal(t, chunkenc.ValHistogram, typ) + require.Equal(t, chunkenc.ValFloatHistogram, typ) _, fh := statsIterator.AtFloatHistogram(nil) actualHints[2] = fh.CounterResetHint diff --git a/promql/promqltest/testdata/native_histograms.test b/promql/promqltest/testdata/native_histograms.test index 0958b8951e..83933e1e7f 100644 --- a/promql/promqltest/testdata/native_histograms.test +++ b/promql/promqltest/testdata/native_histograms.test @@ -1608,6 +1608,24 @@ eval instant at 1m histogram_quantile(0.5, myHistogram2) eval instant at 1m histogram_quantile(0.5, mixedHistogram) expect warn msg: PromQL warning: vector contains a mix of classic and native histograms for metric name "mixedHistogram" +clear + +# A counter reset only in a bucket. Sub-queries still need to detect +# it via explicit counter reset detection. This test also runs it with +# histogram_count in the expression to make sure that the +# HistogramStatsIterator is not used. (The latter fails to correctly +# do the counter resets because Seek is used with sub-queries. And the +# explicit counter reset detection done with sub-queries cannot access +# the buckets anymore, if HistogramStatsIterator is used.) +load 1m + h{} {{schema:0 count:1 sum:10 buckets:[1]}}+{{}}x20 {{schema:0 count:1 sum:10 buckets:[0 1]}}+{{}}x20 + +# Both evals below should yield the same value for the count. +eval instant at 41m histogram_count(increase(h[40m:9m])) + {} 1.4814814814814814 + +eval instant at 41m increase(h[40m:9m]) + {} {{count:1.4814814814814814 sum:14.814814814814813 counter_reset_hint:gauge offset:1 buckets:[1.4814814814814814]}} clear @@ -1617,31 +1635,28 @@ load 1m # Trigger an annotation about conflicting counter resets by going through the # HistogramStatsIterator, which creates counter reset hints on the fly. -eval instant at 5m 1*histogram_count(sum_over_time(reset{timing="late"}[5m])) +eval instant at 5m histogram_count(sum_over_time(reset{timing="late"}[5m])) expect warn msg: PromQL warning: conflicting counter resets during histogram addition {timing="late"} 7 -eval instant at 5m 1*histogram_count(sum(reset)) +eval instant at 5m histogram_count(sum(reset)) expect warn msg: PromQL warning: conflicting counter resets during histogram aggregation {} 5 -eval instant at 5m 1*histogram_count(avg(reset)) +eval instant at 5m histogram_count(avg(reset)) expect warn msg: PromQL warning: conflicting counter resets during histogram aggregation {} 2.5 # No annotation with the right timing. -eval instant at 30s 1*histogram_count(sum(reset)) +eval instant at 30s histogram_count(sum(reset)) expect no_warn {} 3 -eval instant at 30s 1*histogram_count(avg(reset)) +eval instant at 30s histogram_count(avg(reset)) expect no_warn {} 1.5 # Ensure that the annotation does not happen with rate. -eval instant at 5m 1*histogram_count(rate(reset{timing="late"}[5m])) +eval instant at 5m histogram_count(rate(reset{timing="late"}[5m])) expect no_warn {timing="late"} 0.0175 - -# NOTE: The `1*` part in the expressions above should not be needed. -# It can be removed once https://github.com/prometheus/prometheus/pull/17127 is merged.