diff --git a/promql/engine.go b/promql/engine.go index b73364bc87..5757604b7a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1367,18 +1367,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) - vecNumSamples := result.TotalSamples() - ev.currentSamples += vecNumSamples - // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also - // needs to include the samples from the result here, as they're still in memory. - tempNumSamples += vecNumSamples - ev.samplesStats.UpdatePeak(ev.currentSamples) - - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { mat := make(Matrix, len(result)) @@ -1393,6 +1381,9 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } } // Reuse the original point slice. @@ -2946,7 +2937,33 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par } } - // Construct the result Vector from the aggregated groups. + // Construct the result from the aggregated groups. + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) { + // If this could be an instant query, build a slice so the result is in consistent order. + if ev.endTimestamp == ev.startTimestamp { + enh.Out = append(enh.Out, Sample{Metric: lbls, F: f, H: h}) + } else { + // Otherwise the results are added into seriess elements. + hash := lbls.Hash() + ss, ok := seriess[hash] + if !ok { + ss = Series{Metric: lbls} + } + if h == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: enh.Ts, F: f}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: enh.Ts, H: h}) + } + seriess[hash] = ss + } + } for _, aggr := range orderedResult { switch op { case parser.AVG: @@ -2976,10 +2993,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par sort.Sort(sort.Reverse(aggr.heap)) } for _, v := range aggr.heap { - enh.Out = append(enh.Out, Sample{ - Metric: v.Metric, - F: v.F, - }) + add(v.Metric, v.F, nil) } continue // Bypass default append. @@ -2989,10 +3003,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par sort.Sort(sort.Reverse(aggr.reverseHeap)) } for _, v := range aggr.reverseHeap { - enh.Out = append(enh.Out, Sample{ - Metric: v.Metric, - F: v.F, - }) + add(v.Metric, v.F, nil) } continue // Bypass default append. @@ -3015,42 +3026,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par // For other aggregations, we already have the right value. } - enh.Out = append(enh.Out, Sample{ - Metric: aggr.labels, - F: aggr.floatValue, - H: aggr.histogramValue, - }) + add(aggr.labels, aggr.floatValue, aggr.histogramValue) } - ts := enh.Ts - // If this could be an instant query, shortcut so as not to change sort order. - if ev.endTimestamp == ev.startTimestamp { - return enh.Out, annos - } - - numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 - // Add samples in output vector to output series. - for _, sample := range enh.Out { - h := sample.Metric.Hash() - ss, ok := seriess[h] - if !ok { - ss = Series{Metric: sample.Metric} - } - if sample.H == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) - } - seriess[h] = ss - } - - return nil, annos + return enh.Out, annos } // aggregationK evaluates count_values on vec. diff --git a/promql/engine_test.go b/promql/engine_test.go index 13731efd45..0202c15ae1 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -966,7 +966,7 @@ load 10s { Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", Start: time.Unix(201, 0), - PeakSamples: 8, + PeakSamples: 7, TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12,