From c006c57efceec54c7bb44f571cbb208a357bdd9b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 1 Feb 2024 13:22:38 -0300 Subject: [PATCH] Proposal to improve FPointSlice and HPointSlice allocation. (#13448) * Reusing points slice from previous series when the slice is under utilized * Adding comments on the bench test Signed-off-by: Alan Protasio --- promql/bench_test.go | 19 +++++++++++++++++++ promql/engine.go | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index b7a4978de2..516b0d7482 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -33,6 +33,10 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, ctx := context.Background() metrics := []labels.Labels{} + // Generating test series: a_X, b_X, and h_X, where X can take values of one, ten, or hundred, + // representing the number of series each metric name contains. + // Metric a_X and b_X are simple metrics where h_X is a histogram. + // These metrics will have data for all test time range metrics = append(metrics, labels.FromStrings("__name__", "a_one")) metrics = append(metrics, labels.FromStrings("__name__", "b_one")) for j := 0; j < 10; j++ { @@ -59,6 +63,9 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, } refs := make([]storage.SeriesRef, len(metrics)) + // Number points for each different label value of "l" for the sparse series + pointsPerSparseSeries := numIntervals / 50 + for s := 0; s < numIntervals; s++ { a := stor.Appender(context.Background()) ts := int64(s * interval) @@ -66,10 +73,18 @@ func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, ref, _ := a.Append(refs[i], metric, ts, float64(s)+float64(i)/float64(len(metrics))) refs[i] = ref } + // Generating a sparse time series: each label value of "l" will contain data only for + // pointsPerSparseSeries points + metric := labels.FromStrings("__name__", "sparse", "l", strconv.Itoa(s/pointsPerSparseSeries)) + _, err := a.Append(0, metric, ts, float64(s)/float64(len(metrics))) + if err != nil { + return err + } if err := a.Commit(); err != nil { return err } } + stor.DB.ForceHeadMMap() // Ensure we have at most one head chunk for every series. stor.DB.Compact(ctx) return nil @@ -94,6 +109,10 @@ func rangeQueryCases() []benchCase { expr: "rate(a_X[1m])", steps: 10000, }, + { + expr: "rate(sparse[1m])", + steps: 10000, + }, // Holt-Winters and long ranges. { expr: "holt_winters(a_X[1d], 0.3, 0.3)", diff --git a/promql/engine.go b/promql/engine.go index 786b76e1f4..02004e5f96 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1452,6 +1452,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // Reuse objects across steps to save memory allocations. var floats []FPoint var histograms []HPoint + var prevSS *Series inMatrix := make(Matrix, 1) inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{Out: make(Vector, 0, 1)} @@ -1512,12 +1513,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if len(outVec) > 0 { if outVec[0].H == nil { if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) } + ss.Floats = append(ss.Floats, FPoint{F: outVec[0].F, T: ts}) } else { if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) + ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } ss.Histograms = append(ss.Histograms, HPoint{H: outVec[0].H, T: ts}) } @@ -1526,9 +1528,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio it.ReduceDelta(stepRange) } histSamples := totalHPointSize(ss.Histograms) + if len(ss.Floats)+histSamples > 0 { if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples { mat = append(mat, ss) + prevSS = &mat[len(mat)-1] ev.currentSamples += len(ss.Floats) + histSamples } else { ev.error(ErrTooManySamples(env)) @@ -1678,6 +1682,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } mat := make(Matrix, 0, len(e.Series)) + var prevSS *Series it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator for i, s := range e.Series { @@ -1697,14 +1702,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if ev.currentSamples < ev.maxSamples { if h == nil { if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) } ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) ev.currentSamples++ ev.samplesStats.IncrementSamplesAtStep(step, 1) } else { if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) + ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } point := HPoint{H: h, T: ts} ss.Histograms = append(ss.Histograms, point) @@ -1720,6 +1725,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if len(ss.Floats)+len(ss.Histograms) > 0 { mat = append(mat, ss) + prevSS = &mat[len(mat)-1] } } ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -1840,6 +1846,30 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio panic(fmt.Errorf("unhandled expression of type: %T", expr)) } +// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room. +// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. +func reuseOrGetHPointSlices(prevSS *Series, numSteps int) (r []HPoint) { + if prevSS != nil && cap(prevSS.Histograms)-2*len(prevSS.Histograms) > 0 { + r = prevSS.Histograms[len(prevSS.Histograms):] + prevSS.Histograms = prevSS.Histograms[0:len(prevSS.Histograms):len(prevSS.Histograms)] + return + } + + return getHPointSlice(numSteps) +} + +// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room. +// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. +func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { + if prevSS != nil && cap(prevSS.Floats)-2*len(prevSS.Floats) > 0 { + r = prevSS.Floats[len(prevSS.Floats):] + prevSS.Floats = prevSS.Floats[0:len(prevSS.Floats):len(prevSS.Floats)] + return + } + + return getFPointSlice(numSteps) +} + func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { ws, err := checkAndExpandSeriesSet(ev.ctx, vs) if err != nil {