From 87420774980288cc4d9c59342bf9ff278db8c04c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 2 Sep 2024 20:20:40 +0100 Subject: [PATCH] [BUGFIX] PromQL: pass Context so spans parent correctly Assigning to `evaluator.ctx` in `eval()` broke the parent-child relationship. Signed-off-by: Bryan Boreham --- promql/engine.go | 103 +++++++++++++++++++++----------------------- promql/functions.go | 9 ++-- 2 files changed, 53 insertions(+), 59 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index b54ce2d6dc..537eafb41e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -713,7 +713,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval startTimestamp: start, endTimestamp: start, interval: 1, - ctx: ctxInnerEval, maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: s.LookbackDelta, @@ -723,7 +722,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } query.sampleStats.InitStepTracking(start, start, 1) - val, warnings, err := evaluator.Eval(s.Expr) + val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr) evalSpanTimer.Finish() @@ -772,7 +771,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval startTimestamp: timeMilliseconds(s.Start), endTimestamp: timeMilliseconds(s.End), interval: durationMilliseconds(s.Interval), - ctx: ctxInnerEval, maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: s.LookbackDelta, @@ -781,7 +779,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval enableDelayedNameRemoval: ng.enableDelayedNameRemoval, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) - val, warnings, err := evaluator.Eval(s.Expr) + val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr) evalSpanTimer.Finish() @@ -1029,8 +1027,6 @@ func (e errWithWarnings) Error() string { return e.err.Error() } // querier and reports errors. On timeout or cancellation of its context it // terminates. type evaluator struct { - ctx context.Context - startTimestamp int64 // Start time in milliseconds. endTimestamp int64 // End time in milliseconds. interval int64 // Interval in milliseconds. @@ -1079,10 +1075,10 @@ func (ev *evaluator) recover(expr parser.Expr, ws *annotations.Annotations, errp } } -func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) { +func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) { defer ev.recover(expr, &ws, &err) - v, ws = ev.eval(expr) + v, ws = ev.eval(ctx, expr) if ev.enableDelayedNameRemoval { ev.cleanupMetricLabels(v) } @@ -1133,7 +1129,7 @@ func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { // function call results. // The prepSeries function (if provided) can be used to prepare the helper // for each series, then passed to each call funcCall. -func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) { +func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) { numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 matrixes := make([]Matrix, len(exprs)) origMatrixes := make([]Matrix, len(exprs)) @@ -1144,7 +1140,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // Functions will take string arguments from the expressions, not the values. if e != nil && e.Type() != parser.ValueTypeString { // ev.currentSamples will be updated to the correct value within the ev.eval call. - val, ws := ev.eval(e) + val, ws := ev.eval(ctx, e) warnings.Merge(ws) matrixes[i] = val.(Matrix) @@ -1196,7 +1192,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } // Reset number of samples in memory after each timestamp. @@ -1306,7 +1302,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) return mat, warnings } -func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { +func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { // Keep a copy of the original point slice so that it can be returned to the pool. origMatrix := slices.Clone(inputMatrix) defer func() { @@ -1386,7 +1382,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } // Reset number of samples in memory after each timestamp. @@ -1437,11 +1433,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. -func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { +func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { samplesStats := ev.samplesStats // Avoid double counting samples when running a subquery, those samples will be counted in later stage. ev.samplesStats = ev.samplesStats.NewChild() - val, ws := ev.eval(subq) + val, ws := ev.eval(ctx, subq) // But do incorporate the peak from the subquery samplesStats.UpdatePeakFromSubquery(ev.samplesStats) ev.samplesStats = samplesStats @@ -1468,17 +1464,16 @@ func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSele } // eval evaluates the given expression as the given AST expression node requires. -func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotations) { +func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, annotations.Annotations) { // This is the top-level evaluation method. // Thus, we check for timeout/cancellation here. - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 // Create a new span to help investigate inner evaluation performances. - ctxWithSpan, span := otel.Tracer("").Start(ev.ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String()) - ev.ctx = ctxWithSpan + ctx, span := otel.Tracer("").Start(ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String()) defer span.End() switch e := expr.(type) { @@ -1500,7 +1495,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio sortedGrouping = append(sortedGrouping, valueLabel.Val) slices.Sort(sortedGrouping) } - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) }, e.Expr) } @@ -1510,16 +1505,16 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // param is the number k for topk/bottomk, or q for quantile. var fParam float64 if param != nil { - val, ws := ev.eval(param) + val, ws := ev.eval(ctx, param) warnings.Merge(ws) fParam = val.(Matrix)[0].Floats[0].F } // Now fetch the data to be aggregated. - val, ws := ev.eval(e.Expr) + val, ws := ev.eval(ctx, e.Expr) warnings.Merge(ws) inputMatrix := val.(Matrix) - result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam) + result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fParam) warnings.Merge(ws) ev.currentSamples = originalNumSamples + result.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -1537,7 +1532,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio unwrapParenExpr(&arg) vs, ok := arg.(*parser.VectorSelector) if ok { - return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e) + return ev.rangeEvalTimestampFunctionOverVectorSelector(ctx, vs, call, e) } } @@ -1561,7 +1556,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio matrixArgIndex = i matrixArg = true // Replacing parser.SubqueryExpr with parser.MatrixSelector. - val, totalSamples, ws := ev.evalSubquery(subq) + val, totalSamples, ws := ev.evalSubquery(ctx, subq) e.Args[i] = val warnings.Merge(ws) defer func() { @@ -1576,14 +1571,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // Special handling for functions that work on series not samples. switch e.Func.Name { case "label_replace": - return ev.evalLabelReplace(e.Args) + return ev.evalLabelReplace(ctx, e.Args) case "label_join": - return ev.evalLabelJoin(e.Args) + return ev.evalLabelJoin(ctx, e.Args) } if !matrixArg { // Does not have a matrix argument. - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, annos := call(v, e.Args, enh) return vec, warnings.Merge(annos) }, e.Args...) @@ -1595,7 +1590,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio otherInArgs := make([]Vector, len(e.Args)) for i, e := range e.Args { if i != matrixArgIndex { - val, ws := ev.eval(e) + val, ws := ev.eval(ctx, e) otherArgs[i] = val.(Matrix) otherInArgs[i] = Vector{Sample{}} inArgs[i] = otherInArgs[i] @@ -1609,7 +1604,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio sel := arg.(*parser.MatrixSelector) selVS := sel.VectorSelector.(*parser.VectorSelector) - ws, err := checkAndExpandSeriesSet(ev.ctx, sel) + ws, err := checkAndExpandSeriesSet(ctx, sel) warnings.Merge(ws) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings}) @@ -1639,7 +1634,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio dropName := e.Func.Name != "last_over_time" for i, s := range selVS.Series { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } ev.currentSamples -= len(floats) + totalHPointSize(histograms) @@ -1785,10 +1780,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio return mat, warnings case *parser.ParenExpr: - return ev.eval(e.Expr) + return ev.eval(ctx, e.Expr) case *parser.UnaryExpr: - val, ws := ev.eval(e.Expr) + val, ws := ev.eval(ctx, e.Expr) mat := val.(Matrix) if e.Op == parser.SUB { for i := range mat { @@ -1809,7 +1804,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio case *parser.BinaryExpr: switch lt, rt := e.LHS.Type(), e.RHS.Type(); { case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F) return append(enh.Out, Sample{F: val}), nil }, e.LHS, e.RHS) @@ -1822,39 +1817,39 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio } switch e.Op { case parser.LAND: - return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) case parser.LOR: - return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) case parser.LUNLESS: - return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) default: - return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, err := ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh) return vec, handleVectorBinopError(err, e) }, e.LHS, e.RHS) } case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, err := ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh) return vec, handleVectorBinopError(err, e) }, e.LHS, e.RHS) case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, err := ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh) return vec, handleVectorBinopError(err, e) }, e.LHS, e.RHS) } case *parser.NumberLiteral: - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil }) @@ -1862,7 +1857,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio return String{V: e.Val, T: ev.startTimestamp}, nil case *parser.VectorSelector: - ws, err := checkAndExpandSeriesSet(ev.ctx, e) + ws, err := checkAndExpandSeriesSet(ctx, e) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } @@ -1871,7 +1866,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator for i, s := range e.Series { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } chkIter = s.Iterator(chkIter) @@ -1922,14 +1917,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if ev.startTimestamp != ev.endTimestamp { panic(errors.New("cannot do range evaluation of matrix selector")) } - return ev.matrixSelector(e) + return ev.matrixSelector(ctx, e) case *parser.SubqueryExpr: offsetMillis := durationMilliseconds(e.Offset) rangeMillis := durationMilliseconds(e.Range) newEv := &evaluator{ endTimestamp: ev.endTimestamp - offsetMillis, - ctx: ev.ctx, currentSamples: ev.currentSamples, maxSamples: ev.maxSamples, logger: ev.logger, @@ -1959,7 +1953,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio setOffsetForAtModifier(newEv.startTimestamp, e.Expr) } - res, ws := newEv.eval(e.Expr) + res, ws := newEv.eval(ctx, e.Expr) ev.currentSamples = newEv.currentSamples ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples) @@ -1967,14 +1961,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio case *parser.StepInvariantExpr: switch ce := e.Expr.(type) { case *parser.StringLiteral, *parser.NumberLiteral: - return ev.eval(ce) + return ev.eval(ctx, ce) } newEv := &evaluator{ startTimestamp: ev.startTimestamp, endTimestamp: ev.startTimestamp, // Always a single evaluation. interval: ev.interval, - ctx: ev.ctx, currentSamples: ev.currentSamples, maxSamples: ev.maxSamples, logger: ev.logger, @@ -1983,7 +1976,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ev.enableDelayedNameRemoval, } - res, ws := newEv.eval(e.Expr) + res, ws := newEv.eval(ctx, e.Expr) ev.currentSamples = newEv.currentSamples ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { @@ -2059,8 +2052,8 @@ func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { return getFPointSlice(numSteps) } -func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { - ws, err := checkAndExpandSeriesSet(ev.ctx, vs) +func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(ctx context.Context, vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { + ws, err := checkAndExpandSeriesSet(ctx, vs) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } @@ -2071,7 +2064,7 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)) } - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { if vs.Timestamp != nil { // This is a special case for "timestamp()" when the @ modifier is used, to ensure that // we return a point for each time step in this case. @@ -2207,7 +2200,7 @@ func putMatrixSelectorHPointSlice(p []HPoint) { } // matrixSelector evaluates a *parser.MatrixSelector expression. -func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) { +func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSelector) (Matrix, annotations.Annotations) { var ( vs = node.VectorSelector.(*parser.VectorSelector) @@ -2218,7 +2211,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota it = storage.NewBuffer(durationMilliseconds(node.Range)) ) - ws, err := checkAndExpandSeriesSet(ev.ctx, node) + ws, err := checkAndExpandSeriesSet(ctx, node) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } @@ -2226,7 +2219,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota var chkIter chunkenc.Iterator series := vs.Series for i, s := range series { - if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } chkIter = s.Iterator(chkIter) diff --git a/promql/functions.go b/promql/functions.go index 9fa7fbe190..8141d2a9e6 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,6 +14,7 @@ package promql import ( + "context" "errors" "fmt" "math" @@ -1463,7 +1464,7 @@ func funcChanges(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelp } // label_replace function operates only on series; does not look at timestamps or values. -func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, annotations.Annotations) { +func (ev *evaluator) evalLabelReplace(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) { var ( dst = stringFromArg(args[1]) repl = stringFromArg(args[2]) @@ -1479,7 +1480,7 @@ func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, an panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst)) } - val, ws := ev.eval(args[0]) + val, ws := ev.eval(ctx, args[0]) matrix := val.(Matrix) lb := labels.NewBuilder(labels.EmptyLabels()) @@ -1520,7 +1521,7 @@ func funcVector(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelpe } // label_join function operates only on series; does not look at timestamps or values. -func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annotations.Annotations) { +func (ev *evaluator) evalLabelJoin(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) { var ( dst = stringFromArg(args[1]) sep = stringFromArg(args[2]) @@ -1537,7 +1538,7 @@ func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annot panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst)) } - val, ws := ev.eval(args[0]) + val, ws := ev.eval(ctx, args[0]) matrix := val.(Matrix) srcVals := make([]string, len(srcLabels)) lb := labels.NewBuilder(labels.EmptyLabels())