diff --git a/promql/ast.go b/promql/ast.go index ccfd9cac98..af3ed4189f 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -132,9 +132,8 @@ type MatrixSelector struct { Offset time.Duration LabelMatchers []*labels.Matcher - // The series iterators are populated at query preparation time. - series []storage.Series - iterators []*storage.BufferedSeriesIterator + // The series are populated at query preparation time. + series []storage.Series } // NumberLiteral represents a number. @@ -166,9 +165,8 @@ type VectorSelector struct { Offset time.Duration LabelMatchers []*labels.Matcher - // The series iterators are populated at query preparation time. - series []storage.Series - iterators []*storage.BufferedSeriesIterator + // The series are populated at query preparation time. + series []storage.Series } func (e *AggregateExpr) Type() ValueType { return ValueTypeVector } diff --git a/promql/bench_test.go b/promql/bench_test.go index cd91854513..c109500217 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -13,36 +13,183 @@ package promql -import "testing" +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" -// A Benchmark holds context for running a unit test as a benchmark. -type Benchmark struct { - b *testing.B - t *Test - iterCount int -} + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/util/testutil" +) -// NewBenchmark returns an initialized empty Benchmark. -func NewBenchmark(b *testing.B, input string) *Benchmark { - t, err := NewTest(b, input) - if err != nil { - b.Fatalf("Unable to run benchmark: %s", err) +func BenchmarkRangeQuery(b *testing.B) { + storage := testutil.NewStorage(b) + defer storage.Close() + engine := NewEngine(nil, nil, 10, 100*time.Second) + + metrics := []labels.Labels{} + metrics = append(metrics, labels.FromStrings("__name__", "a_one")) + metrics = append(metrics, labels.FromStrings("__name__", "b_one")) + for j := 0; j < 10; j++ { + metrics = append(metrics, labels.FromStrings("__name__", "h_one", "le", strconv.Itoa(j))) } - return &Benchmark{ - b: b, - t: t, - } -} + metrics = append(metrics, labels.FromStrings("__name__", "h_one", "le", "+Inf")) -// Run runs the benchmark. -func (b *Benchmark) Run() { - defer b.t.Close() - b.b.ReportAllocs() - b.b.ResetTimer() - for i := 0; i < b.b.N; i++ { - if err := b.t.RunAsBenchmark(b); err != nil { - b.b.Error(err) + for i := 0; i < 10; i++ { + metrics = append(metrics, labels.FromStrings("__name__", "a_ten", "l", strconv.Itoa(i))) + metrics = append(metrics, labels.FromStrings("__name__", "b_ten", "l", strconv.Itoa(i))) + for j := 0; j < 10; j++ { + metrics = append(metrics, labels.FromStrings("__name__", "h_ten", "l", strconv.Itoa(i), "le", strconv.Itoa(j))) } - b.iterCount++ + metrics = append(metrics, labels.FromStrings("__name__", "h_ten", "l", strconv.Itoa(i), "le", "+Inf")) + } + + for i := 0; i < 100; i++ { + metrics = append(metrics, labels.FromStrings("__name__", "a_hundred", "l", strconv.Itoa(i))) + metrics = append(metrics, labels.FromStrings("__name__", "b_hundred", "l", strconv.Itoa(i))) + for j := 0; j < 10; j++ { + metrics = append(metrics, labels.FromStrings("__name__", "h_hundred", "l", strconv.Itoa(i), "le", strconv.Itoa(j))) + } + metrics = append(metrics, labels.FromStrings("__name__", "h_hundred", "l", strconv.Itoa(i), "le", "+Inf")) + } + refs := make([]uint64, len(metrics)) + + // A day of data plus 10k steps. + numIntervals := 8640 + 10000 + + for s := 0; s < numIntervals; s += 1 { + a, err := storage.Appender() + if err != nil { + b.Fatal(err) + } + ts := int64(s * 10000) // 10s interval. + for i, metric := range metrics { + err := a.AddFast(metric, refs[i], ts, float64(s)) + if err != nil { + refs[i], _ = a.Add(metric, ts, float64(s)) + } + } + if err := a.Commit(); err != nil { + b.Fatal(err) + } + } + + type benchCase struct { + expr string + steps int + } + cases := []benchCase{ + // Simple rate. + { + expr: "rate(a_X[1m])", + }, + { + expr: "rate(a_X[1m])", + steps: 10000, + }, + // Holt-Winters and long ranges. + { + expr: "holt_winters(a_X[1d], 0.3, 0.3)", + }, + { + expr: "changes(a_X[1d])", + }, + { + expr: "rate(a_X[1d])", + }, + // Unary operators. + { + expr: "-a_X", + }, + // Binary operators. + { + expr: "a_X - b_X", + }, + { + expr: "a_X - b_X", + steps: 10000, + }, + { + expr: "a_X and b_X{l=~'.*[0-4]$'}", + }, + { + expr: "a_X or b_X{l=~'.*[0-4]$'}", + }, + { + expr: "a_X unless b_X{l=~'.*[0-4]$'}", + }, + // Simple functions. + { + expr: "abs(a_X)", + }, + { + expr: "label_replace(a_X, 'l2', '$1', 'l', '(.*)')", + }, + { + expr: "label_join(a_X, 'l2', '-', 'l', 'l')", + }, + // Combinations. + { + expr: "rate(a_X[1m]) + rate(b_X[1m])", + }, + { + expr: "sum without (l)(rate(a_X[1m]))", + }, + { + expr: "sum without (l)(rate(a_X[1m])) / sum without (l)(rate(b_X[1m]))", + }, + { + expr: "histogram_quantile(0.9, rate(h_X[5m]))", + }, + } + + // X in an expr will be replaced by different metric sizes. + tmp := []benchCase{} + for _, c := range cases { + if !strings.Contains(c.expr, "X") { + tmp = append(tmp, c) + } else { + tmp = append(tmp, benchCase{expr: strings.Replace(c.expr, "X", "one", -1), steps: c.steps}) + tmp = append(tmp, benchCase{expr: strings.Replace(c.expr, "X", "ten", -1), steps: c.steps}) + tmp = append(tmp, benchCase{expr: strings.Replace(c.expr, "X", "hundred", -1), steps: c.steps}) + } + } + cases = tmp + + // No step will be replaced by cases with the standard step. + tmp = []benchCase{} + for _, c := range cases { + if c.steps != 0 { + tmp = append(tmp, c) + } else { + tmp = append(tmp, benchCase{expr: c.expr, steps: 1}) + tmp = append(tmp, benchCase{expr: c.expr, steps: 10}) + tmp = append(tmp, benchCase{expr: c.expr, steps: 100}) + tmp = append(tmp, benchCase{expr: c.expr, steps: 1000}) + } + } + cases = tmp + for _, c := range cases { + name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps) + b.Run(name, func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + qry, err := engine.NewRangeQuery( + storage, c.expr, + time.Unix(int64((numIntervals-c.steps)*10), 0), + time.Unix(int64(numIntervals*10), 0), time.Second*10) + if err != nil { + b.Fatal(err) + } + res := qry.Exec(context.Background()) + if res.Err != nil { + b.Fatal(res.Err) + } + qry.Close() + } + }) } } diff --git a/promql/engine.go b/promql/engine.go index 38389f99cc..aeea2b5d34 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "regexp" "runtime" "sort" "strconv" @@ -53,7 +54,6 @@ type engineMetrics struct { queryQueueTime prometheus.Summary queryPrepareTime prometheus.Summary queryInnerEval prometheus.Summary - queryResultAppend prometheus.Summary queryResultSort prometheus.Summary } @@ -78,8 +78,10 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele // A Query is derived from an a raw query string and can be run against an engine // it is associated with. type Query interface { - // Exec processes the query and + // Exec processes the query. Can only be called once. Exec(ctx context.Context) *Result + // Close recovers memory used by the query result. + Close() // Statement returns the parsed statement of the query. Statement() Statement // Stats returns statistics about the lifetime of the query. @@ -98,6 +100,8 @@ type query struct { stmt Statement // Timer stats for the query execution. stats *stats.TimerGroup + // Result matrix for reuse. + matrix Matrix // Cancellation function for the query. cancel func() @@ -122,6 +126,13 @@ func (q *query) Cancel() { } } +// Close implements the Query interface. +func (q *query) Close() { + for _, s := range q.matrix { + putPointSlice(s.Points) + } +} + // Exec implements the Query interface. func (q *query) Exec(ctx context.Context) *Result { if span := opentracing.SpanFromContext(ctx); span != nil { @@ -199,13 +210,6 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, Help: "Query timings", ConstLabels: prometheus.Labels{"slice": "inner_eval"}, }), - queryResultAppend: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "query_duration_seconds", - Help: "Query timings", - ConstLabels: prometheus.Labels{"slice": "result_append"}, - }), queryResultSort: prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, @@ -222,7 +226,6 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, metrics.maxConcurrentQueries, metrics.queryInnerEval, metrics.queryPrepareTime, - metrics.queryResultAppend, metrics.queryResultSort, ) } @@ -352,11 +355,11 @@ func durationMilliseconds(d time.Duration) int64 { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) { prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - querier, err := ng.populateIterators(ctx, query.queryable, s) + querier, err := ng.populateSeries(ctx, query.queryable, s) prepareTimer.Stop() ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) - // XXX(fabxc): the querier returned by populateIterators might be instantiated + // XXX(fabxc): the querier returned by populateSeries might be instantiated // we must not return without closing irrespective of the error. // TODO: make this semantically saner. if querier != nil { @@ -368,13 +371,15 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() - // Instant evaluation. + // Instant evaluation. This is executed as a range evaluation with one step. if s.Start == s.End && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ - Timestamp: start, - ctx: ctx, - logger: ng.logger, + startTimestamp: start, + endTimestamp: start, + interval: 1, + ctx: ctx, + logger: ng.logger, } val, err := evaluator.Eval(s.Expr) if err != nil { @@ -383,84 +388,52 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( evalTimer.Stop() ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) - // Point might have a different timestamp, force it to the evaluation - // timestamp as that is when we ran the evaluation. - switch v := val.(type) { - case Scalar: - v.T = start - case Vector: - for i := range v { - v[i].Point.T = start - } - } - return val, nil - } - numSteps := int(s.End.Sub(s.Start) / s.Interval) - - // Range evaluation. - Seriess := map[uint64]Series{} - for ts := s.Start; !ts.After(s.End); ts = ts.Add(s.Interval) { - - if err := contextDone(ctx, "range evaluation"); err != nil { - return nil, err - } - - t := timeMilliseconds(ts) - evaluator := &evaluator{ - Timestamp: t, - ctx: ctx, - logger: ng.logger, - } - val, err := evaluator.Eval(s.Expr) - if err != nil { - return nil, err - } - - switch v := val.(type) { - case Scalar: - // As the expression type does not change we can safely default to 0 - // as the fingerprint for Scalar expressions. - ss, ok := Seriess[0] - if !ok { - ss = Series{Points: make([]Point, 0, numSteps)} - Seriess[0] = ss - } - ss.Points = append(ss.Points, Point{V: v.V, T: t}) - Seriess[0] = ss - case Vector: - for _, sample := range v { - h := sample.Metric.Hash() - ss, ok := Seriess[h] - if !ok { - ss = Series{ - Metric: sample.Metric, - Points: make([]Point, 0, numSteps), - } - Seriess[h] = ss - } - sample.Point.T = t - ss.Points = append(ss.Points, sample.Point) - Seriess[h] = ss - } - default: + mat, ok := val.(Matrix) + if !ok { panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } + query.matrix = mat + switch s.Expr.Type() { + case ValueTypeVector: + // Convert matrix with one value per series into vector. + vector := make(Vector, len(mat)) + for i, s := range mat { + // Point might have a different timestamp, force it to the evaluation + // timestamp as that is when we ran the evaluation. + vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}} + } + return vector, nil + case ValueTypeScalar: + return Scalar{V: mat[0].Points[0].V, T: start}, nil + case ValueTypeMatrix: + return mat, nil + default: + panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) + } + + } + + // Range evaluation. + evaluator := &evaluator{ + startTimestamp: timeMilliseconds(s.Start), + endTimestamp: timeMilliseconds(s.End), + interval: durationMilliseconds(s.Interval), + ctx: ctx, + logger: ng.logger, + } + val, err := evaluator.Eval(s.Expr) + if err != nil { + return nil, err } evalTimer.Stop() ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) - if err := contextDone(ctx, "expression evaluation"); err != nil { - return nil, err + mat, ok := val.(Matrix) + if !ok { + panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } - - appendTimer := query.stats.GetTimer(stats.ResultAppendTime).Start() - mat := Matrix{} - for _, ss := range Seriess { - mat = append(mat, ss) - } - appendTimer.Stop() - ng.metrics.queryResultAppend.Observe(appendTimer.ElapsedTime().Seconds()) + query.matrix = mat if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, err @@ -476,7 +449,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return mat, nil } -func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { +func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { var maxOffset time.Duration Inspect(s.Expr, func(node Node, _ []Node) bool { switch n := node.(type) { @@ -526,10 +499,6 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return false } - for _, s := range n.series { - it := storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) - n.iterators = append(n.iterators, it) - } case *MatrixSelector: params.Func = extractFuncFromPath(path) @@ -544,10 +513,6 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return false } - for _, s := range n.series { - it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range)) - n.iterators = append(n.iterators, it) - } } return true }) @@ -580,31 +545,26 @@ func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { return res, it.Err() } -// An evaluator evaluates given expressions at a fixed timestamp. It is attached to an -// engine through which it connects to a querier and reports errors. On timeout or -// cancellation of its context it terminates. +// An evaluator evaluates given expressions over given fixed timestamps. It +// is attached to an engine through which it connects to a querier and reports +// errors. On timeout or cancellation of its context it terminates. type evaluator struct { ctx context.Context - Timestamp int64 // time in milliseconds + startTimestamp int64 // Start time in milliseconds. - finalizers []func() + endTimestamp int64 // End time in milliseconds. + interval int64 // Interval in milliseconds. logger log.Logger } -func (ev *evaluator) close() { - for _, f := range ev.finalizers { - f() - } -} - -// fatalf causes a panic with the input formatted into an error. +// errorf causes a panic with the input formatted into an error. func (ev *evaluator) errorf(format string, args ...interface{}) { ev.error(fmt.Errorf(format, args...)) } -// fatal causes a panic with the given error. +// error causes a panic with the given error. func (ev *evaluator) error(err error) { panic(err) } @@ -615,90 +575,171 @@ func (ev *evaluator) recover(errp *error) { if e == nil { return } - if _, ok := e.(runtime.Error); ok { + if err, ok := e.(runtime.Error); ok { // Print the stack trace but do not inhibit the running application. buf := make([]byte, 64<<10) buf = buf[:runtime.Stack(buf, false)] level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf)) - *errp = fmt.Errorf("unexpected error") + *errp = fmt.Errorf("unexpected error: %s", err) } else { *errp = e.(error) } } -// evalScalar attempts to evaluate e to a Scalar value and errors otherwise. -func (ev *evaluator) evalScalar(e Expr) Scalar { - val := ev.eval(e) - sv, ok := val.(Scalar) - if !ok { - ev.errorf("expected Scalar but got %s", documentedType(val.Type())) - } - return sv -} - -// evalVector attempts to evaluate e to a Vector value and errors otherwise. -func (ev *evaluator) evalVector(e Expr) Vector { - val := ev.eval(e) - vec, ok := val.(Vector) - if !ok { - ev.errorf("expected instant Vector but got %s", documentedType(val.Type())) - } - return vec -} - -// evalInt attempts to evaluate e into an integer and errors otherwise. -func (ev *evaluator) evalInt(e Expr) int64 { - sc := ev.evalScalar(e) - if !convertibleToInt64(sc.V) { - ev.errorf("Scalar value %v overflows int64", sc.V) - } - return int64(sc.V) -} - -// evalFloat attempts to evaluate e into a float and errors otherwise. -func (ev *evaluator) evalFloat(e Expr) float64 { - sc := ev.evalScalar(e) - return float64(sc.V) -} - -// evalMatrix attempts to evaluate e into a Matrix and errors otherwise. -// The error message uses the term "range Vector" to match the user facing -// documentation. -func (ev *evaluator) evalMatrix(e Expr) Matrix { - val := ev.eval(e) - mat, ok := val.(Matrix) - if !ok { - ev.errorf("expected range Vector but got %s", documentedType(val.Type())) - } - return mat -} - -// evalString attempts to evaluate e to a string value and errors otherwise. -func (ev *evaluator) evalString(e Expr) String { - val := ev.eval(e) - sv, ok := val.(String) - if !ok { - ev.errorf("expected string but got %s", documentedType(val.Type())) - } - return sv -} - -// evalOneOf evaluates e and errors unless the result is of one of the given types. -func (ev *evaluator) evalOneOf(e Expr, t1, t2 ValueType) Value { - val := ev.eval(e) - if val.Type() != t1 && val.Type() != t2 { - ev.errorf("expected %s or %s but got %s", documentedType(t1), documentedType(t2), documentedType(val.Type())) - } - return val -} - func (ev *evaluator) Eval(expr Expr) (v Value, err error) { defer ev.recover(&err) - defer ev.close() return ev.eval(expr), nil } +// Extra information and caches for evaluating a single node across steps. +type EvalNodeHelper struct { + // Evaluation timestamp. + ts int64 + // Vector that can be used for output. + out Vector + + // Caches. + // dropMetricName and label_*. + dmn map[uint64]labels.Labels + // signatureFunc. + sigf map[uint64]uint64 + // funcHistogramQuantile. + signatureToMetricWithBuckets map[uint64]*metricWithBuckets + // label_replace. + regex *regexp.Regexp + + // For binary vector matching. + rightSigs map[uint64]Sample + matchedSigs map[uint64]map[uint64]struct{} + resultMetric map[uint64]labels.Labels +} + +// dropMetricName is a cached version of dropMetricName. +func (enh *EvalNodeHelper) dropMetricName(l labels.Labels) labels.Labels { + if enh.dmn == nil { + enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) + } + h := l.Hash() + ret, ok := enh.dmn[h] + if ok { + return ret + } + ret = dropMetricName(l) + enh.dmn[h] = ret + return ret +} + +// signatureFunc is a cached version of signatureFunc. +func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) uint64 { + if enh.sigf == nil { + enh.sigf = make(map[uint64]uint64, len(enh.out)) + } + f := signatureFunc(on, names...) + return func(l labels.Labels) uint64 { + h := l.Hash() + ret, ok := enh.sigf[h] + if ok { + return ret + } + ret = f(l) + enh.sigf[h] = ret + return ret + } +} + +// rangeEval evaluates the given expressions, and then for each step calls +// the given function with the values computed for each expression at that +// step. The return value is the combination into time series of of all the +// function call results. +func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ...Expr) Matrix { + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + matrixes := make([]Matrix, len(exprs)) + origMatrixes := make([]Matrix, len(exprs)) + for i, e := range exprs { + // Functions will take string arguments from the expressions, not the values. + if e != nil && e.Type() != ValueTypeString { + matrixes[i] = ev.eval(e).(Matrix) + + // Keep a copy of the original point slices so that they + // can be returned to the pool. + origMatrixes[i] = make(Matrix, len(matrixes[i])) + copy(origMatrixes[i], matrixes[i]) + } + } + + vectors := make([]Vector, len(exprs)) // Input vectors for the function. + args := make([]Value, len(exprs)) // Argument to function. + // Create an output vector that is as big as the input matrix with + // the most time series. + biggestLen := 1 + for i := range exprs { + vectors[i] = make(Vector, 0, len(matrixes[i])) + if len(matrixes[i]) > biggestLen { + biggestLen = len(matrixes[i]) + } + } + enh := &EvalNodeHelper{out: make(Vector, 0, biggestLen)} + seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + // Gather input vectors for this timestamp. + for i := range exprs { + vectors[i] = vectors[i][:0] + for si, series := range matrixes[i] { + for _, point := range series.Points { + if point.T == ts { + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + matrixes[i][si].Points = series.Points[1:] + } + break + } + } + args[i] = vectors[i] + } + // Make the function call. + enh.ts = ts + result := f(args, enh) + enh.out = result[:0] // Reuse result vector. + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + mat := make(Matrix, len(result)) + for i, s := range result { + s.Point.T = ts + mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}} + } + return mat + } + // Add samples in output vector to output series. + for _, sample := range result { + h := sample.Metric.Hash() + ss, ok := seriess[h] + if !ok { + ss = Series{ + Metric: sample.Metric, + Points: getPointSlice(numSteps), + } + } + sample.Point.T = ts + ss.Points = append(ss.Points, sample.Point) + seriess[h] = ss + } + } + // Reuse the original point slices. + for _, m := range origMatrixes { + for _, s := range m { + putPointSlice(s.Points) + } + } + // Assemble the output matrix. + mat := make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + mat = append(mat, ss) + } + return mat +} + // eval evaluates the given expression as the given AST expression node requires. func (ev *evaluator) eval(expr Expr) Value { // This is the top-level evaluation method. @@ -706,119 +747,273 @@ func (ev *evaluator) eval(expr Expr) Value { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 switch e := expr.(type) { case *AggregateExpr: - Vector := ev.evalVector(e.Expr) - return ev.aggregation(e.Op, e.Grouping, e.Without, e.Param, Vector) - - case *BinaryExpr: - lhs := ev.evalOneOf(e.LHS, ValueTypeScalar, ValueTypeVector) - rhs := ev.evalOneOf(e.RHS, ValueTypeScalar, ValueTypeVector) - - switch lt, rt := lhs.Type(), rhs.Type(); { - case lt == ValueTypeScalar && rt == ValueTypeScalar: - return Scalar{ - V: scalarBinop(e.Op, lhs.(Scalar).V, rhs.(Scalar).V), - T: ev.Timestamp, - } - - case lt == ValueTypeVector && rt == ValueTypeVector: - switch e.Op { - case itemLAND: - return ev.VectorAnd(lhs.(Vector), rhs.(Vector), e.VectorMatching) - case itemLOR: - return ev.VectorOr(lhs.(Vector), rhs.(Vector), e.VectorMatching) - case itemLUnless: - return ev.VectorUnless(lhs.(Vector), rhs.(Vector), e.VectorMatching) - default: - return ev.VectorBinop(e.Op, lhs.(Vector), rhs.(Vector), e.VectorMatching, e.ReturnBool) - } - case lt == ValueTypeVector && rt == ValueTypeScalar: - return ev.VectorscalarBinop(e.Op, lhs.(Vector), rhs.(Scalar), false, e.ReturnBool) - - case lt == ValueTypeScalar && rt == ValueTypeVector: - return ev.VectorscalarBinop(e.Op, rhs.(Vector), lhs.(Scalar), true, e.ReturnBool) + if s, ok := e.Param.(*StringLiteral); ok { + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh) + }, e.Expr) } + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + var param float64 + if e.Param != nil { + param = v[0].(Vector)[0].V + } + return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh) + }, e.Param, e.Expr) case *Call: - return e.Func.Call(ev, e.Args) + if e.Func.Name == "timestamp" { + // Matrix evaluation always returns the evaluation time, + // so this function needs special handling when given + // a vector selector. + vs, ok := e.Args[0].(*VectorSelector) + if ok { + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return e.Func.Call([]Value{ev.vectorSelector(vs, enh.ts)}, e.Args, enh) + }) + } + } + // Check if the function has a matrix argument. + var matrixArgIndex int + var matrixArg bool + for i, a := range e.Args { + _, ok := a.(*MatrixSelector) + if ok { + matrixArgIndex = i + matrixArg = true + break + } + } + if !matrixArg { + // Does not have a matrix argument. + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return e.Func.Call(v, e.Args, enh) + }, e.Args...) + } - case *MatrixSelector: - return ev.matrixSelector(e) + inArgs := make([]Value, len(e.Args)) + // Evaluate any non-matrix arguments. + otherArgs := make([]Matrix, len(e.Args)) + otherInArgs := make([]Vector, len(e.Args)) + for i, e := range e.Args { + if i != matrixArgIndex { + otherArgs[i] = ev.eval(e).(Matrix) + otherInArgs[i] = Vector{Sample{}} + inArgs[i] = otherInArgs[i] + } + } - case *NumberLiteral: - return Scalar{V: e.Val, T: ev.Timestamp} + sel := e.Args[matrixArgIndex].(*MatrixSelector) + mat := make(Matrix, 0, len(sel.series)) // Output matrix. + offset := durationMilliseconds(sel.Offset) + selRange := durationMilliseconds(sel.Range) + // Reuse objects across steps to save memory allocations. + points := getPointSlice(16) + inMatrix := make(Matrix, 1) + inArgs[matrixArgIndex] = inMatrix + enh := &EvalNodeHelper{out: make(Vector, 0, 1)} + // Process all the calls for one time series at a time. + var it *storage.BufferedSeriesIterator + for i, s := range sel.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), selRange) + } else { + it.Reset(s.Iterator()) + } + ss := Series{ + // For all range vector functions, the only change to the + // output labels is dropping the metric name so just do + // it once here. + Metric: dropMetricName(sel.series[i].Labels()), + Points: getPointSlice(numSteps), + } + inMatrix[0].Metric = sel.series[i].Labels() + for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { + step++ + // Set the non-matrix arguments. + // They are scalar, so it is safe to use the step number + // when looking up the argument, as there will be no gaps. + for j := range e.Args { + if j != matrixArgIndex { + otherInArgs[j][0].V = otherArgs[j][0].Points[step].V + } + } + maxt := ts - offset + mint := maxt - selRange + // Evaluate the matrix selector for this series for this step. + points = ev.matrixIterSlice(it, mint, maxt, points[:0]) + if len(points) == 0 { + continue + } + inMatrix[0].Points = points + enh.ts = ts + // Make the function call. + outVec := e.Func.Call(inArgs, e.Args, enh) + enh.out = outVec[:0] + if len(outVec) > 0 { + ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts}) + } + } + if len(ss.Points) > 0 { + mat = append(mat, ss) + } + } + putPointSlice(points) + return mat case *ParenExpr: return ev.eval(e.Expr) - case *StringLiteral: - return String{V: e.Val, T: ev.Timestamp} - case *UnaryExpr: - se := ev.evalOneOf(e.Expr, ValueTypeScalar, ValueTypeVector) - // Only + and - are possible operators. + mat := ev.eval(e.Expr).(Matrix) if e.Op == itemSUB { - switch v := se.(type) { - case Scalar: - v.V = -v.V - case Vector: - for i, sv := range v { - v[i].V = -sv.V + for i := range mat { + mat[i].Metric = dropMetricName(mat[i].Metric) + for j := range mat[i].Points { + mat[i].Points[j].V = -mat[i].Points[j].V } } } - return se + return mat + + case *BinaryExpr: + switch lt, rt := e.LHS.Type(), e.RHS.Type(); { + case lt == ValueTypeScalar && rt == ValueTypeScalar: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V) + return append(enh.out, Sample{Point: Point{V: val}}) + }, e.LHS, e.RHS) + case lt == ValueTypeVector && rt == ValueTypeVector: + switch e.Op { + case itemLAND: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh) + }, e.LHS, e.RHS) + case itemLOR: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh) + }, e.LHS, e.RHS) + case itemLUnless: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh) + }, e.LHS, e.RHS) + default: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh) + }, e.LHS, e.RHS) + } + + case lt == ValueTypeVector && rt == ValueTypeScalar: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh) + }, e.LHS, e.RHS) + + case lt == ValueTypeScalar && rt == ValueTypeVector: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh) + }, e.LHS, e.RHS) + } + + case *NumberLiteral: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return append(enh.out, Sample{Point: Point{V: e.Val}}) + }) case *VectorSelector: - return ev.vectorSelector(e) + mat := make(Matrix, 0, len(e.series)) + var it *storage.BufferedSeriesIterator + for i, s := range e.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) + } else { + it.Reset(s.Iterator()) + } + ss := Series{ + Metric: e.series[i].Labels(), + Points: getPointSlice(numSteps), + } + + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + _, v, ok := ev.vectorSelectorSingle(it, e, ts) + if ok { + ss.Points = append(ss.Points, Point{V: v, T: ts}) + } + } + + if len(ss.Points) > 0 { + mat = append(mat, ss) + } + } + return mat + + case *MatrixSelector: + if ev.startTimestamp != ev.endTimestamp { + panic(fmt.Errorf("cannot do range evaluation of matrix selector")) + } + return ev.matrixSelector(e) } + panic(fmt.Errorf("unhandled expression of type: %T", expr)) } // vectorSelector evaluates a *VectorSelector expression. -func (ev *evaluator) vectorSelector(node *VectorSelector) Vector { +func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { var ( - vec = make(Vector, 0, len(node.series)) - refTime = ev.Timestamp - durationMilliseconds(node.Offset) + vec = make(Vector, 0, len(node.series)) ) - for i, it := range node.iterators { - var t int64 - var v float64 - - ok := it.Seek(refTime) - if !ok { - if it.Err() != nil { - ev.error(it.Err()) - } + var it *storage.BufferedSeriesIterator + for i, s := range node.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) + } else { + it.Reset(s.Iterator()) } + t, v, ok := ev.vectorSelectorSingle(it, node, ts) if ok { - t, v = it.Values() + vec = append(vec, Sample{ + Metric: node.series[i].Labels(), + Point: Point{V: v, T: t}, + }) } - peek := 1 - if !ok || t > refTime { - t, v, ok = it.PeekBack(peek) - peek++ - if !ok || t < refTime-durationMilliseconds(LookbackDelta) { - continue - } - } - if value.IsStaleNaN(v) { - continue - } - - vec = append(vec, Sample{ - Metric: node.series[i].Labels(), - Point: Point{V: v, T: t}, - }) } return vec } +// vectorSelectorSingle evaluates a instant vector for the iterator of one time series. +func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *VectorSelector, ts int64) (int64, float64, bool) { + refTime := ts - durationMilliseconds(node.Offset) + var t int64 + var v float64 + + ok := it.Seek(refTime) + if !ok { + if it.Err() != nil { + ev.error(it.Err()) + } + } + + if ok { + t, v = it.Values() + } + + if !ok || t > refTime { + t, v, ok = it.PeekBack(1) + if !ok || t < refTime-durationMilliseconds(LookbackDelta) { + return 0, 0, false + } + } + if value.IsStaleNaN(v) { + return 0, 0, false + } + return t, v, true +} + var pointPool = sync.Pool{} func getPointSlice(sz int) []Point { @@ -833,70 +1028,27 @@ func putPointSlice(p []Point) { pointPool.Put(p[:0]) } -var matrixPool = sync.Pool{} - -func getMatrix(sz int) Matrix { - m := matrixPool.Get() - if m != nil { - return m.(Matrix) - } - return make(Matrix, 0, sz) -} - -func putMatrix(m Matrix) { - matrixPool.Put(m[:0]) -} - // matrixSelector evaluates a *MatrixSelector expression. func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var ( offset = durationMilliseconds(node.Offset) - maxt = ev.Timestamp - offset + maxt = ev.startTimestamp - offset mint = maxt - durationMilliseconds(node.Range) - matrix = getMatrix(len(node.series)) - // Write all points into a single slice to avoid lots of tiny allocations. - allPoints = getPointSlice(5 * len(matrix)) + matrix = make(Matrix, 0, len(node.series)) ) - ev.finalizers = append(ev.finalizers, - func() { putPointSlice(allPoints) }, - func() { putMatrix(matrix) }, - ) - - for i, it := range node.iterators { - start := len(allPoints) - + var it *storage.BufferedSeriesIterator + for i, s := range node.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range)) + } else { + it.Reset(s.Iterator()) + } ss := Series{ Metric: node.series[i].Labels(), } - ok := it.Seek(maxt) - if !ok { - if it.Err() != nil { - ev.error(it.Err()) - } - } - - buf := it.Buffer() - for buf.Next() { - t, v := buf.At() - if value.IsStaleNaN(v) { - continue - } - // Values in the buffer are guaranteed to be smaller than maxt. - if t >= mint { - allPoints = append(allPoints, Point{T: t, V: v}) - } - } - // The seeked sample might also be in the range. - if ok { - t, v := it.Values() - if t == maxt && !value.IsStaleNaN(v) { - allPoints = append(allPoints, Point{T: t, V: v}) - } - } - - ss.Points = allPoints[start:] + ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) if len(ss.Points) > 0 { matrix = append(matrix, ss) @@ -905,13 +1057,42 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { return matrix } -func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector { +// matrixIterSlice evaluates a matrix vector for the iterator of one time series. +func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { + ok := it.Seek(maxt) + if !ok { + if it.Err() != nil { + ev.error(it.Err()) + } + } + + buf := it.Buffer() + for buf.Next() { + t, v := buf.At() + if value.IsStaleNaN(v) { + continue + } + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + out = append(out, Point{T: t, V: v}) + } + } + // The seeked sample might also be in the range. + if ok { + t, v := it.Values() + if t == maxt && !value.IsStaleNaN(v) { + out = append(out, Point{T: t, V: v}) + } + } + return out +} + +func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector { if matching.Card != CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := signatureFunc(matching.On, matching.MatchingLabels...) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) - var result Vector // The set of signatures for the right-hand side Vector. rightSigs := map[uint64]struct{}{} // Add all rhs samples to a map so we can easily find matches later. @@ -922,63 +1103,58 @@ func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector for _, ls := range lhs { // If there's a matching entry in the right-hand side Vector, add the sample. if _, ok := rightSigs[sigf(ls.Metric)]; ok { - result = append(result, ls) + enh.out = append(enh.out, ls) } } - return result + return enh.out } -func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *VectorMatching) Vector { +func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector { if matching.Card != CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := signatureFunc(matching.On, matching.MatchingLabels...) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) - var result Vector leftSigs := map[uint64]struct{}{} // Add everything from the left-hand-side Vector. for _, ls := range lhs { leftSigs[sigf(ls.Metric)] = struct{}{} - result = append(result, ls) + enh.out = append(enh.out, ls) } // Add all right-hand side elements which have not been added from the left-hand side. for _, rs := range rhs { if _, ok := leftSigs[sigf(rs.Metric)]; !ok { - result = append(result, rs) + enh.out = append(enh.out, rs) } } - return result + return enh.out } -func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *VectorMatching) Vector { +func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector { if matching.Card != CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := signatureFunc(matching.On, matching.MatchingLabels...) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) rightSigs := map[uint64]struct{}{} for _, rs := range rhs { rightSigs[sigf(rs.Metric)] = struct{}{} } - var result Vector for _, ls := range lhs { if _, ok := rightSigs[sigf(ls.Metric)]; !ok { - result = append(result, ls) + enh.out = append(enh.out, ls) } } - return result + return enh.out } // VectorBinop evaluates a binary operation between two Vectors, excluding set operators. -func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorMatching, returnBool bool) Vector { +func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorMatching, returnBool bool, enh *EvalNodeHelper) Vector { if matching.Card == CardManyToMany { panic("many-to-many only allowed for set operators") } - var ( - result = Vector{} - sigf = signatureFunc(matching.On, matching.MatchingLabels...) - ) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) // The control flow below handles one-to-one or many-to-one matching. // For one-to-many, swap sidedness and account for the swap when calculating @@ -988,7 +1164,14 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM } // All samples from the rhs hashed by the matching label/values. - rightSigs := map[uint64]Sample{} + if enh.rightSigs == nil { + enh.rightSigs = make(map[uint64]Sample, len(enh.out)) + } else { + for k := range enh.rightSigs { + delete(enh.rightSigs, k) + } + } + rightSigs := enh.rightSigs // Add all rhs samples to a map so we can easily find matches later. for _, rs := range rhs { @@ -1004,7 +1187,14 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM // Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one // the value is a set of signatures to detect duplicated result elements. - matchedSigs := map[uint64]map[uint64]struct{}{} + if enh.matchedSigs == nil { + enh.matchedSigs = make(map[uint64]map[uint64]struct{}, len(rightSigs)) + } else { + for k := range enh.matchedSigs { + delete(enh.matchedSigs, k) + } + } + matchedSigs := enh.matchedSigs // For all lhs samples find a respective rhs sample and perform // the binary operation. @@ -1031,7 +1221,7 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM } else if !keep { continue } - metric := resultMetric(ls.Metric, rs.Metric, op, matching) + metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh) insertedSigs, exists := matchedSigs[sig] if matching.Card == CardOneToOne { @@ -1054,12 +1244,12 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM insertedSigs[insertSig] = struct{}{} } - result = append(result, Sample{ + enh.out = append(enh.out, Sample{ Metric: metric, - Point: Point{V: value, T: ev.Timestamp}, + Point: Point{V: value}, }) } - return result + return enh.out } func hashWithoutLabels(lset labels.Labels, names ...string) uint64 { @@ -1109,7 +1299,20 @@ func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 { // resultMetric returns the metric for the given sample(s) based on the Vector // binary operation and the matching options. -func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching) labels.Labels { +func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching, enh *EvalNodeHelper) labels.Labels { + if enh.resultMetric == nil { + enh.resultMetric = make(map[uint64]labels.Labels, len(enh.out)) + } + // op and matching are always the same for a given node, so + // there's no need to include them in the hash key. + // If the lhs and rhs are the same then the xor would be 0, + // so add in one side to protect against that. + lh := lhs.Hash() + h := (lh ^ rhs.Hash()) + lh + if ret, ok := enh.resultMetric[h]; ok { + return ret + } + lb := labels.NewBuilder(lhs) if shouldDropMetricName(op) { @@ -1140,13 +1343,13 @@ func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching) } } - return lb.Labels() + ret := lb.Labels() + enh.resultMetric[h] = ret + return ret } // VectorscalarBinop evaluates a binary operation between a Vector and a Scalar. -func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap, returnBool bool) Vector { - vec := make(Vector, 0, len(lhs)) - +func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper) Vector { for _, lhsSample := range lhs { lv, rv := lhsSample.V, rhs.V // lhs always contains the Vector. If the original position was different @@ -1166,12 +1369,12 @@ func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap if keep { lhsSample.V = value if shouldDropMetricName(op) || returnBool { - lhsSample.Metric = dropMetricName(lhsSample.Metric) + lhsSample.Metric = enh.dropMetricName(lhsSample.Metric) } - vec = append(vec, lhsSample) + enh.out = append(enh.out, lhsSample) } } - return vec + return enh.out } func dropMetricName(l labels.Labels) labels.Labels { @@ -1265,23 +1468,27 @@ type groupedAggregation struct { } // aggregation evaluates an aggregation operation on a Vector. -func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param Expr, vec Vector) Vector { +func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector { result := map[uint64]*groupedAggregation{} var k int64 if op == itemTopK || op == itemBottomK { - k = ev.evalInt(param) + f := param.(float64) + if !convertibleToInt64(f) { + ev.errorf("Scalar value %v overflows int64", f) + } + k = int64(f) if k < 1 { return Vector{} } } var q float64 if op == itemQuantile { - q = ev.evalFloat(param) + q = param.(float64) } var valueLabel string if op == itemCountValues { - valueLabel = ev.evalString(param).V + valueLabel = param.(string) if !without { grouping = append(grouping, valueLabel) } @@ -1411,8 +1618,6 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p } // Construct the result Vector from the aggregated groups. - resultVector := make(Vector, 0, len(result)) - for _, aggr := range result { switch op { case itemAvg: @@ -1433,9 +1638,9 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p // The heap keeps the lowest value on top, so reverse it. sort.Sort(sort.Reverse(aggr.heap)) for _, v := range aggr.heap { - resultVector = append(resultVector, Sample{ + enh.out = append(enh.out, Sample{ Metric: v.Metric, - Point: Point{V: v.V, T: ev.Timestamp}, + Point: Point{V: v.V}, }) } continue // Bypass default append. @@ -1444,9 +1649,9 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p // The heap keeps the lowest value on top, so reverse it. sort.Sort(sort.Reverse(aggr.reverseHeap)) for _, v := range aggr.reverseHeap { - resultVector = append(resultVector, Sample{ + enh.out = append(enh.out, Sample{ Metric: v.Metric, - Point: Point{V: v.V, T: ev.Timestamp}, + Point: Point{V: v.V}, }) } continue // Bypass default append. @@ -1458,12 +1663,12 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p // For other aggregations, we already have the right value. } - resultVector = append(resultVector, Sample{ + enh.out = append(enh.out, Sample{ Metric: aggr.labels, - Point: Point{V: aggr.value, T: ev.Timestamp}, + Point: Point{V: aggr.value}, }) } - return resultVector + return enh.out } // btos returns 1 if b is true, 0 otherwise. diff --git a/promql/functions.go b/promql/functions.go index 91160a039a..7e0ca4b16a 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,6 +14,7 @@ package promql import ( + "fmt" "math" "regexp" "sort" @@ -32,29 +33,41 @@ type Function struct { ArgTypes []ValueType Variadic int ReturnType ValueType - Call func(ev *evaluator, args Expressions) Value + + // vals is a list of the evaluated arguments for the function call. + // For range vectors it will be a Matrix with one series, instant vectors a + // Vector, scalars a Vector with one series whose value is the scalar + // value,and nil for strings. + // args are the original arguments to the function, where you can access + // matrixSelectors, vectorSelectors, and StringLiterals. + // enh.out is a pre-allocated empty vector that you may use to accumulate + // output before returning it. The vectors in vals should not be returned.a + // Range vector functions need only return a vector with the right value, + // the metric and timestamp are not neded. + // Instant vector functions need only return a vector with the right values and + // metrics, the timestamp are not needed. + // Scalar results should be returned as the value of a sample in a Vector. + Call func(vals []Value, args Expressions, enh *EvalNodeHelper) Vector } // === time() float64 === -func funcTime(ev *evaluator, args Expressions) Value { - return Scalar{ - V: float64(ev.Timestamp) / 1000, - T: ev.Timestamp, - } +func funcTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return Vector{Sample{Point: Point{ + V: float64(enh.ts) / 1000, + }}} } // extrapolatedRate is a utility function for rate/increase/delta. // It calculates the rate (allowing for counter resets if isCounter is true), // extrapolates if the first/last sample is close to the boundary, and returns // the result as either per-second (if isRate is true) or overall. -func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Value { - ms := arg.(*MatrixSelector) +func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector { + ms := args[0].(*MatrixSelector) var ( - matrix = ev.evalMatrix(ms) - rangeStart = ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset) - rangeEnd = ev.Timestamp - durationMilliseconds(ms.Offset) - resultVector = make(Vector, 0, len(matrix)) + matrix = vals[0].(Matrix) + rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset) + rangeEnd = enh.ts - durationMilliseconds(ms.Offset) ) for _, samples := range matrix { @@ -118,42 +131,40 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu resultValue = resultValue / ms.Range.Seconds() } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: resultValue, T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: resultValue}, }) } - return resultVector + return enh.out } // === delta(Matrix ValueTypeMatrix) Vector === -func funcDelta(ev *evaluator, args Expressions) Value { - return extrapolatedRate(ev, args[0], false, false) +func funcDelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extrapolatedRate(vals, args, enh, false, false) } // === rate(node ValueTypeMatrix) Vector === -func funcRate(ev *evaluator, args Expressions) Value { - return extrapolatedRate(ev, args[0], true, true) +func funcRate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extrapolatedRate(vals, args, enh, true, true) } // === increase(node ValueTypeMatrix) Vector === -func funcIncrease(ev *evaluator, args Expressions) Value { - return extrapolatedRate(ev, args[0], true, false) +func funcIncrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extrapolatedRate(vals, args, enh, true, false) } // === irate(node ValueTypeMatrix) Vector === -func funcIrate(ev *evaluator, args Expressions) Value { - return instantValue(ev, args[0], true) +func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return instantValue(vals, enh.out, true) } // === idelta(node model.ValMatric) Vector === -func funcIdelta(ev *evaluator, args Expressions) Value { - return instantValue(ev, args[0], false) +func funcIdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return instantValue(vals, enh.out, false) } -func instantValue(ev *evaluator, arg Expr, isRate bool) Value { - resultVector := Vector{} - for _, samples := range ev.evalMatrix(arg) { +func instantValue(vals []Value, out Vector, isRate bool) Vector { + for _, samples := range vals[0].(Matrix) { // No sense in trying to compute a rate without at least two points. Drop // this Vector element. if len(samples.Points) < 2 { @@ -182,12 +193,11 @@ func instantValue(ev *evaluator, arg Expr, isRate bool) Value { resultValue /= float64(sampledInterval) / 1000 } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: resultValue, T: ev.Timestamp}, + out = append(out, Sample{ + Point: Point{V: resultValue}, }) } - return resultVector + return out } // Calculate the trend value at the given index i in raw data d. @@ -195,18 +205,15 @@ func instantValue(ev *evaluator, arg Expr, isRate bool) Value { // The argument "s" is the set of computed smoothed values. // The argument "b" is the set of computed trend factors. // The argument "d" is the set of raw input values. -func calcTrendValue(i int, sf, tf float64, s, b, d []float64) float64 { +func calcTrendValue(i int, sf, tf, s0, s1, b float64) float64 { if i == 0 { - return b[0] + return b } - x := tf * (s[i] - s[i-1]) - y := (1 - tf) * b[i-1] + x := tf * (s1 - s0) + y := (1 - tf) * b - // Cache the computed value. - b[i] = x + y - - return b[i] + return x + y } // Holt-Winters is similar to a weighted moving average, where historical data has exponentially less influence on the current data. @@ -214,29 +221,23 @@ func calcTrendValue(i int, sf, tf float64, s, b, d []float64) float64 { // data. A lower smoothing factor increases the influence of historical data. The trend factor (0 < tf < 1) affects // how trends in historical data will affect the current data. A higher trend factor increases the influence. // of trends. Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing titled: "Double exponential smoothing". -func funcHoltWinters(ev *evaluator, args Expressions) Value { - mat := ev.evalMatrix(args[0]) +func funcHoltWinters(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + mat := vals[0].(Matrix) // The smoothing factor argument. - sf := ev.evalFloat(args[1]) + sf := vals[1].(Vector)[0].V // The trend factor argument. - tf := ev.evalFloat(args[2]) + tf := vals[2].(Vector)[0].V // Sanity check the input. if sf <= 0 || sf >= 1 { - ev.errorf("invalid smoothing factor. Expected: 0 < sf < 1 goT: %f", sf) + panic(fmt.Errorf("invalid smoothing factor. Expected: 0 < sf < 1 goT: %f", sf)) } if tf <= 0 || tf >= 1 { - ev.errorf("invalid trend factor. Expected: 0 < tf < 1 goT: %f", sf) + panic(fmt.Errorf("invalid trend factor. Expected: 0 < tf < 1 goT: %f", sf)) } - // Make an output Vector large enough to hold the entire result. - resultVector := make(Vector, 0, len(mat)) - - // Create scratch values. - var s, b, d []float64 - var l int for _, samples := range mat { l = len(samples.Points) @@ -246,144 +247,130 @@ func funcHoltWinters(ev *evaluator, args Expressions) Value { continue } - // Resize scratch values. - if l != len(s) { - s = make([]float64, l) - b = make([]float64, l) - d = make([]float64, l) - } - - // Fill in the d values with the raw values from the input. - for i, v := range samples.Points { - d[i] = v.V - } - + var s0, s1, b float64 // Set initial values. - s[0] = d[0] - b[0] = d[1] - d[0] + s1 = samples.Points[0].V + b = samples.Points[1].V - samples.Points[0].V // Run the smoothing operation. var x, y float64 - for i := 1; i < len(d); i++ { + for i := 1; i < l; i++ { // Scale the raw value against the smoothing factor. - x = sf * d[i] + x = sf * samples.Points[i].V // Scale the last smoothed value with the trend at this point. - y = (1 - sf) * (s[i-1] + calcTrendValue(i-1, sf, tf, s, b, d)) + b = calcTrendValue(i-1, sf, tf, s0, s1, b) + y = (1 - sf) * (s1 + b) - s[i] = x + y + s0, s1 = s1, x+y } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: s[len(s)-1], T: ev.Timestamp}, // The last value in the Vector is the smoothed result. + enh.out = append(enh.out, Sample{ + Point: Point{V: s1}, }) } - return resultVector + return enh.out } // === sort(node ValueTypeVector) Vector === -func funcSort(ev *evaluator, args Expressions) Value { +func funcSort(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { // NaN should sort to the bottom, so take descending sort with NaN first and // reverse it. - byValueSorter := vectorByReverseValueHeap(ev.evalVector(args[0])) + byValueSorter := vectorByReverseValueHeap(vals[0].(Vector)) sort.Sort(sort.Reverse(byValueSorter)) return Vector(byValueSorter) } // === sortDesc(node ValueTypeVector) Vector === -func funcSortDesc(ev *evaluator, args Expressions) Value { +func funcSortDesc(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { // NaN should sort to the bottom, so take ascending sort with NaN first and // reverse it. - byValueSorter := vectorByValueHeap(ev.evalVector(args[0])) + byValueSorter := vectorByValueHeap(vals[0].(Vector)) sort.Sort(sort.Reverse(byValueSorter)) return Vector(byValueSorter) } // === clamp_max(Vector ValueTypeVector, max Scalar) Vector === -func funcClampMax(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - max := ev.evalFloat(args[1]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Min(max, float64(el.V)) +func funcClampMax(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) + max := vals[1].(Vector)[0].Point.V + for _, el := range vec { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: math.Min(max, float64(el.V))}, + }) } - return vec + return enh.out } // === clamp_min(Vector ValueTypeVector, min Scalar) Vector === -func funcClampMin(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - min := ev.evalFloat(args[1]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Max(min, float64(el.V)) +func funcClampMin(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) + min := vals[1].(Vector)[0].Point.V + for _, el := range vec { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: math.Max(min, float64(el.V))}, + }) } - return vec + return enh.out } // === round(Vector ValueTypeVector, toNearest=1 Scalar) Vector === -func funcRound(ev *evaluator, args Expressions) Value { +func funcRound(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) // round returns a number rounded to toNearest. // Ties are solved by rounding up. toNearest := float64(1) if len(args) >= 2 { - toNearest = ev.evalFloat(args[1]) + toNearest = vals[1].(Vector)[0].Point.V } // Invert as it seems to cause fewer floating point accuracy issues. toNearestInverse := 1.0 / toNearest - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Floor(float64(el.V)*toNearestInverse+0.5) / toNearestInverse + for _, el := range vec { + v := math.Floor(float64(el.V)*toNearestInverse+0.5) / toNearestInverse + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: v}, + }) } - return vec + return enh.out } // === Scalar(node ValueTypeVector) Scalar === -func funcScalar(ev *evaluator, args Expressions) Value { - v := ev.evalVector(args[0]) +func funcScalar(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + v := vals[0].(Vector) if len(v) != 1 { - return Scalar{ - V: math.NaN(), - T: ev.Timestamp, - } - } - return Scalar{ - V: v[0].V, - T: ev.Timestamp, + return append(enh.out, Sample{ + Point: Point{V: math.NaN()}, + }) } + return append(enh.out, Sample{ + Point: Point{V: v[0].V}, + }) } -func aggrOverTime(ev *evaluator, args Expressions, aggrFn func([]Point) float64) Value { - mat := ev.evalMatrix(args[0]) - resultVector := Vector{} +func aggrOverTime(vals []Value, enh *EvalNodeHelper, aggrFn func([]Point) float64) Vector { + mat := vals[0].(Matrix) for _, el := range mat { if len(el.Points) == 0 { continue } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(el.Metric), - Point: Point{V: aggrFn(el.Points), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: aggrFn(el.Points)}, }) } - return resultVector + return enh.out } // === avg_over_time(Matrix ValueTypeMatrix) Vector === -func funcAvgOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcAvgOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum float64 for _, v := range values { sum += v.V @@ -393,27 +380,16 @@ func funcAvgOverTime(ev *evaluator, args Expressions) Value { } // === count_over_time(Matrix ValueTypeMatrix) Vector === -func funcCountOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcCountOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { return float64(len(values)) }) } // === floor(Vector ValueTypeVector) Vector === -func funcFloor(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Floor(float64(el.V)) - } - return vec -} - // === max_over_time(Matrix ValueTypeMatrix) Vector === -func funcMaxOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcMaxOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { max := math.Inf(-1) for _, v := range values { max = math.Max(max, float64(v.V)) @@ -423,8 +399,8 @@ func funcMaxOverTime(ev *evaluator, args Expressions) Value { } // === min_over_time(Matrix ValueTypeMatrix) Vector === -func funcMinOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcMinOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { min := math.Inf(1) for _, v := range values { min = math.Min(min, float64(v.V)) @@ -434,8 +410,8 @@ func funcMinOverTime(ev *evaluator, args Expressions) Value { } // === sum_over_time(Matrix ValueTypeMatrix) Vector === -func funcSumOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcSumOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum float64 for _, v := range values { sum += v.V @@ -445,32 +421,29 @@ func funcSumOverTime(ev *evaluator, args Expressions) Value { } // === quantile_over_time(Matrix ValueTypeMatrix) Vector === -func funcQuantileOverTime(ev *evaluator, args Expressions) Value { - q := ev.evalFloat(args[0]) - mat := ev.evalMatrix(args[1]) - resultVector := Vector{} +func funcQuantileOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + q := vals[0].(Vector)[0].V + mat := vals[1].(Matrix) for _, el := range mat { if len(el.Points) == 0 { continue } - el.Metric = dropMetricName(el.Metric) values := make(vectorByValueHeap, 0, len(el.Points)) for _, v := range el.Points { values = append(values, Sample{Point: Point{V: v.V}}) } - resultVector = append(resultVector, Sample{ - Metric: el.Metric, - Point: Point{V: quantile(q, values), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: quantile(q, values)}, }) } - return resultVector + return enh.out } // === stddev_over_time(Matrix ValueTypeMatrix) Vector === -func funcStddevOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcStddevOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum, squaredSum, count float64 for _, v := range values { sum += v.V @@ -483,8 +456,8 @@ func funcStddevOverTime(ev *evaluator, args Expressions) Value { } // === stdvar_over_time(Matrix ValueTypeMatrix) Vector === -func funcStdvarOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcStdvarOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum, squaredSum, count float64 for _, v := range values { sum += v.V @@ -496,22 +469,10 @@ func funcStdvarOverTime(ev *evaluator, args Expressions) Value { }) } -// === abs(Vector ValueTypeVector) Vector === -func funcAbs(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Abs(float64(el.V)) - } - return vec -} - // === absent(Vector ValueTypeVector) Vector === -func funcAbsent(ev *evaluator, args Expressions) Value { - if len(ev.evalVector(args[0])) > 0 { - return Vector{} +func funcAbsent(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + if len(vals[0].(Vector)) > 0 { + return enh.out } m := []labels.Label{} @@ -522,96 +483,73 @@ func funcAbsent(ev *evaluator, args Expressions) Value { } } } - return Vector{ + return append(enh.out, Sample{ Metric: labels.New(m...), - Point: Point{V: 1, T: ev.Timestamp}, - }, + Point: Point{V: 1}, + }) +} + +func simpleFunc(vals []Value, enh *EvalNodeHelper, f func(float64) float64) Vector { + for _, el := range vals[0].(Vector) { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: f(el.V)}, + }) } + return enh.out +} + +// === abs(Vector ValueTypeVector) Vector === +func funcAbs(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Abs) } // === ceil(Vector ValueTypeVector) Vector === -func funcCeil(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] +func funcCeil(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Ceil) +} - el.Metric = dropMetricName(el.Metric) - el.V = math.Ceil(float64(el.V)) - } - return vec +// === floor(Vector ValueTypeVector) Vector === +func funcFloor(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Floor) } // === exp(Vector ValueTypeVector) Vector === -func funcExp(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Exp(float64(el.V)) - } - return vec +func funcExp(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Exp) } // === sqrt(Vector VectorNode) Vector === -func funcSqrt(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Sqrt(float64(el.V)) - } - return vec +func funcSqrt(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Sqrt) } // === ln(Vector ValueTypeVector) Vector === -func funcLn(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Log(float64(el.V)) - } - return vec +func funcLn(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Log) } // === log2(Vector ValueTypeVector) Vector === -func funcLog2(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Log2(float64(el.V)) - } - return vec +func funcLog2(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Log2) } // === log10(Vector ValueTypeVector) Vector === -func funcLog10(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Log10(float64(el.V)) - } - return vec +func funcLog10(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Log10) } // === timestamp(Vector ValueTypeVector) Vector === -func funcTimestamp(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = float64(el.T) / 1000.0 +func funcTimestamp(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) + for _, el := range vec { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: float64(el.T) / 1000}, + }) } - return vec + return enh.out } // linearRegression performs a least-square linear regression analysis on the @@ -640,9 +578,8 @@ func linearRegression(samples []Point, interceptTime int64) (slope, intercept fl } // === deriv(node ValueTypeMatrix) Vector === -func funcDeriv(ev *evaluator, args Expressions) Value { - mat := ev.evalMatrix(args[0]) - resultVector := make(Vector, 0, len(mat)) +func funcDeriv(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + mat := vals[0].(Matrix) for _, samples := range mat { // No sense in trying to compute a derivative without at least two points. @@ -655,21 +592,17 @@ func funcDeriv(ev *evaluator, args Expressions) Value { // to avoid floating point accuracy issues, see // https://github.com/prometheus/prometheus/issues/2674 slope, _ := linearRegression(samples.Points, samples.Points[0].T) - resultSample := Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: slope, T: ev.Timestamp}, - } - - resultVector = append(resultVector, resultSample) + enh.out = append(enh.out, Sample{ + Point: Point{V: slope}, + }) } - return resultVector + return enh.out } // === predict_linear(node ValueTypeMatrix, k ValueTypeScalar) Vector === -func funcPredictLinear(ev *evaluator, args Expressions) Value { - mat := ev.evalMatrix(args[0]) - resultVector := make(Vector, 0, len(mat)) - duration := ev.evalFloat(args[1]) +func funcPredictLinear(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + mat := vals[0].(Matrix) + duration := vals[1].(Vector)[0].V for _, samples := range mat { // No sense in trying to predict anything without at least two points. @@ -677,23 +610,28 @@ func funcPredictLinear(ev *evaluator, args Expressions) Value { if len(samples.Points) < 2 { continue } - slope, intercept := linearRegression(samples.Points, ev.Timestamp) + slope, intercept := linearRegression(samples.Points, enh.ts) - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: slope*duration + intercept, T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: slope*duration + intercept}, }) } - return resultVector + return enh.out } // === histogram_quantile(k ValueTypeScalar, Vector ValueTypeVector) Vector === -func funcHistogramQuantile(ev *evaluator, args Expressions) Value { - q := ev.evalFloat(args[0]) - inVec := ev.evalVector(args[1]) +func funcHistogramQuantile(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + q := vals[0].(Vector)[0].V + inVec := vals[1].(Vector) + sigf := enh.signatureFunc(false, excludedLabels...) - outVec := Vector{} - signatureToMetricWithBuckets := map[uint64]*metricWithBuckets{} + if enh.signatureToMetricWithBuckets == nil { + enh.signatureToMetricWithBuckets = map[uint64]*metricWithBuckets{} + } else { + for _, v := range enh.signatureToMetricWithBuckets { + v.buckets = v.buckets[:0] + } + } for _, el := range inVec { upperBound, err := strconv.ParseFloat( el.Metric.Get(model.BucketLabel), 64, @@ -703,34 +641,35 @@ func funcHistogramQuantile(ev *evaluator, args Expressions) Value { // TODO(beorn7): Issue a warning somehow. continue } - hash := hashWithoutLabels(el.Metric, excludedLabels...) + hash := sigf(el.Metric) - mb, ok := signatureToMetricWithBuckets[hash] + mb, ok := enh.signatureToMetricWithBuckets[hash] if !ok { el.Metric = labels.NewBuilder(el.Metric). Del(labels.BucketLabel, labels.MetricName). Labels() mb = &metricWithBuckets{el.Metric, nil} - signatureToMetricWithBuckets[hash] = mb + enh.signatureToMetricWithBuckets[hash] = mb } mb.buckets = append(mb.buckets, bucket{upperBound, el.V}) } - for _, mb := range signatureToMetricWithBuckets { - outVec = append(outVec, Sample{ - Metric: mb.metric, - Point: Point{V: bucketQuantile(q, mb.buckets), T: ev.Timestamp}, - }) + for _, mb := range enh.signatureToMetricWithBuckets { + if len(mb.buckets) > 0 { + enh.out = append(enh.out, Sample{ + Metric: mb.metric, + Point: Point{V: bucketQuantile(q, mb.buckets)}, + }) + } } - return outVec + return enh.out } // === resets(Matrix ValueTypeMatrix) Vector === -func funcResets(ev *evaluator, args Expressions) Value { - in := ev.evalMatrix(args[0]) - out := make(Vector, 0, len(in)) +func funcResets(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + in := vals[0].(Matrix) for _, samples := range in { resets := 0 @@ -743,18 +682,16 @@ func funcResets(ev *evaluator, args Expressions) Value { prev = current } - out = append(out, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: float64(resets), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: float64(resets)}, }) } - return out + return enh.out } // === changes(Matrix ValueTypeMatrix) Vector === -func funcChanges(ev *evaluator, args Expressions) Value { - in := ev.evalMatrix(args[0]) - out := make(Vector, 0, len(in)) +func funcChanges(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + in := vals[0].(Matrix) for _, samples := range in { changes := 0 @@ -767,189 +704,214 @@ func funcChanges(ev *evaluator, args Expressions) Value { prev = current } - out = append(out, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: float64(changes), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: float64(changes)}, }) } - return out + return enh.out } // === label_replace(Vector ValueTypeVector, dst_label, replacement, src_labelname, regex ValueTypeString) Vector === -func funcLabelReplace(ev *evaluator, args Expressions) Value { +func funcLabelReplace(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { var ( - vector = ev.evalVector(args[0]) - dst = ev.evalString(args[1]).V - repl = ev.evalString(args[2]).V - src = ev.evalString(args[3]).V - regexStr = ev.evalString(args[4]).V + vector = vals[0].(Vector) + dst = args[1].(*StringLiteral).Val + repl = args[2].(*StringLiteral).Val + src = args[3].(*StringLiteral).Val + regexStr = args[4].(*StringLiteral).Val ) - regex, err := regexp.Compile("^(?:" + regexStr + ")$") - if err != nil { - ev.errorf("invalid regular expression in label_replace(): %s", regexStr) - } - if !model.LabelNameRE.MatchString(string(dst)) { - ev.errorf("invalid destination label name in label_replace(): %s", dst) + if enh.regex == nil { + var err error + enh.regex, err = regexp.Compile("^(?:" + regexStr + ")$") + if err != nil { + panic(fmt.Errorf("invalid regular expression in label_replace(): %s", regexStr)) + } + if !model.LabelNameRE.MatchString(string(dst)) { + panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst)) + } + enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) } outSet := make(map[uint64]struct{}, len(vector)) - for i := range vector { - el := &vector[i] - - srcVal := el.Metric.Get(src) - indexes := regex.FindStringSubmatchIndex(srcVal) - // If there is no match, no replacement should take place. - if indexes == nil { - continue - } - res := regex.ExpandString([]byte{}, repl, srcVal, indexes) - - lb := labels.NewBuilder(el.Metric).Del(dst) - if len(res) > 0 { - lb.Set(dst, string(res)) - } - el.Metric = lb.Labels() - + for _, el := range vector { h := el.Metric.Hash() - if _, ok := outSet[h]; ok { - ev.errorf("duplicated label set in output of label_replace(): %s", el.Metric) + var outMetric labels.Labels + if l, ok := enh.dmn[h]; ok { + outMetric = l } else { - outSet[h] = struct{}{} + srcVal := el.Metric.Get(src) + indexes := enh.regex.FindStringSubmatchIndex(srcVal) + if indexes == nil { + // If there is no match, no replacement should take place. + outMetric = el.Metric + enh.dmn[h] = outMetric + } else { + res := enh.regex.ExpandString([]byte{}, repl, srcVal, indexes) + + lb := labels.NewBuilder(el.Metric).Del(dst) + if len(res) > 0 { + lb.Set(dst, string(res)) + } + outMetric = lb.Labels() + enh.dmn[h] = outMetric + } + } + + outHash := outMetric.Hash() + if _, ok := outSet[outHash]; ok { + panic(fmt.Errorf("duplicated label set in output of label_replace(): %s", el.Metric)) + } else { + enh.out = append(enh.out, + Sample{ + Metric: outMetric, + Point: Point{V: el.Point.V}, + }) + outSet[outHash] = struct{}{} } } - - return vector + return enh.out } // === Vector(s Scalar) Vector === -func funcVector(ev *evaluator, args Expressions) Value { - return Vector{ +func funcVector(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return append(enh.out, Sample{ Metric: labels.Labels{}, - Point: Point{V: ev.evalFloat(args[0]), T: ev.Timestamp}, - }, - } + Point: Point{V: vals[0].(Vector)[0].V}, + }) } // === label_join(vector model.ValVector, dest_labelname, separator, src_labelname...) Vector === -func funcLabelJoin(ev *evaluator, args Expressions) Value { +func funcLabelJoin(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { var ( - vector = ev.evalVector(args[0]) - dst = ev.evalString(args[1]).V - sep = ev.evalString(args[2]).V + vector = vals[0].(Vector) + dst = args[1].(*StringLiteral).Val + sep = args[2].(*StringLiteral).Val srcLabels = make([]string, len(args)-3) ) + + if enh.dmn == nil { + enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) + } + for i := 3; i < len(args); i++ { - src := ev.evalString(args[i]).V + src := args[i].(*StringLiteral).Val if !model.LabelName(src).IsValid() { - ev.errorf("invalid source label name in label_join(): %s", src) + panic(fmt.Errorf("invalid source label name in label_join(): %s", src)) } srcLabels[i-3] = src } if !model.LabelName(dst).IsValid() { - ev.errorf("invalid destination label name in label_join(): %s", dst) + panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst)) } outSet := make(map[uint64]struct{}, len(vector)) - for i := range vector { - el := &vector[i] - - srcVals := make([]string, len(srcLabels)) - for i, src := range srcLabels { - srcVals[i] = el.Metric.Get(src) - } - - lb := labels.NewBuilder(el.Metric) - - strval := strings.Join(srcVals, sep) - if strval == "" { - lb.Del(dst) - } else { - lb.Set(dst, strval) - } - - el.Metric = lb.Labels() + srcVals := make([]string, len(srcLabels)) + for _, el := range vector { h := el.Metric.Hash() - - if _, exists := outSet[h]; exists { - ev.errorf("duplicated label set in output of label_join(): %s", el.Metric) + var outMetric labels.Labels + if l, ok := enh.dmn[h]; ok { + outMetric = l } else { - outSet[h] = struct{}{} + + for i, src := range srcLabels { + srcVals[i] = el.Metric.Get(src) + } + + lb := labels.NewBuilder(el.Metric) + + strval := strings.Join(srcVals, sep) + if strval == "" { + lb.Del(dst) + } else { + lb.Set(dst, strval) + } + + outMetric = lb.Labels() + enh.dmn[h] = outMetric + } + outHash := outMetric.Hash() + + if _, exists := outSet[outHash]; exists { + panic(fmt.Errorf("duplicated label set in output of label_join(): %s", el.Metric)) + } else { + enh.out = append(enh.out, Sample{ + Metric: outMetric, + Point: Point{V: el.Point.V}, + }) + outSet[outHash] = struct{}{} } } - return vector + return enh.out } // Common code for date related functions. -func dateWrapper(ev *evaluator, args Expressions, f func(time.Time) float64) Value { - var v Vector - if len(args) == 0 { - v = Vector{ +func dateWrapper(vals []Value, enh *EvalNodeHelper, f func(time.Time) float64) Vector { + if len(vals) == 0 { + return append(enh.out, Sample{ Metric: labels.Labels{}, - Point: Point{V: float64(ev.Timestamp) / 1000, T: ev.Timestamp}, - }, - } - } else { - v = ev.evalVector(args[0]) + Point: Point{V: f(time.Unix(enh.ts/1000, 0).UTC())}, + }) } - for i := range v { - el := &v[i] - el.Metric = dropMetricName(el.Metric) + for _, el := range vals[0].(Vector) { t := time.Unix(int64(el.V), 0).UTC() - el.V = f(t) + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: f(t)}, + }) } - return v + return enh.out } // === days_in_month(v Vector) Scalar === -func funcDaysInMonth(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcDaysInMonth(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) }) } // === day_of_month(v Vector) Scalar === -func funcDayOfMonth(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcDayOfMonth(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Day()) }) } // === day_of_week(v Vector) Scalar === -func funcDayOfWeek(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcDayOfWeek(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Weekday()) }) } // === hour(v Vector) Scalar === -func funcHour(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcHour(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Hour()) }) } // === minute(v Vector) Scalar === -func funcMinute(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcMinute(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Minute()) }) } // === month(v Vector) Scalar === -func funcMonth(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcMonth(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Month()) }) } // === year(v Vector) Scalar === -func funcYear(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcYear(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Year()) }) } diff --git a/promql/functions_test.go b/promql/functions_test.go index eda22b375b..19680eb7f7 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -23,63 +23,6 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -func BenchmarkHoltWinters4Week5Min(b *testing.B) { - input := ` -clear -load 5m - http_requests{path="/foo"} 0+10x8064 - -eval instant at 4w holt_winters(http_requests[4w], 0.3, 0.3) - {path="/foo"} 80640 -` - - bench := NewBenchmark(b, input) - bench.Run() - -} - -func BenchmarkHoltWinters1Week5Min(b *testing.B) { - input := ` -clear -load 5m - http_requests{path="/foo"} 0+10x2016 - -eval instant at 1w holt_winters(http_requests[1w], 0.3, 0.3) - {path="/foo"} 20160 -` - - bench := NewBenchmark(b, input) - bench.Run() -} - -func BenchmarkHoltWinters1Day1Min(b *testing.B) { - input := ` -clear -load 1m - http_requests{path="/foo"} 0+10x1440 - -eval instant at 1d holt_winters(http_requests[1d], 0.3, 0.3) - {path="/foo"} 14400 -` - - bench := NewBenchmark(b, input) - bench.Run() -} - -func BenchmarkChanges1Day1Min(b *testing.B) { - input := ` -clear -load 1m - http_requests{path="/foo"} 0+10x1440 - -eval instant at 1d changes(http_requests[1d]) - {path="/foo"} 1440 -` - - bench := NewBenchmark(b, input) - bench.Run() -} - func TestDeriv(t *testing.T) { // https://github.com/prometheus/prometheus/issues/2674#issuecomment-315439393 // This requires more precision than the usual test system offers, diff --git a/promql/test.go b/promql/test.go index 6bd07cf7ec..0b512881c6 100644 --- a/promql/test.go +++ b/promql/test.go @@ -160,7 +160,7 @@ func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) { } ts := testStartTime.Add(time.Duration(offset)) - cmd := newEvalCmd(expr, ts, ts, 0) + cmd := newEvalCmd(expr, ts) switch mod { case "ordered": cmd.ordered = true @@ -301,11 +301,9 @@ func (cmd *loadCmd) append(a storage.Appender) error { // evalCmd is a command that evaluates an expression for the given time (range) // and expects a specific result. type evalCmd struct { - expr string - start, end time.Time - interval time.Duration + expr string + start time.Time - instant bool fail, ordered bool metrics map[uint64]labels.Labels @@ -321,13 +319,10 @@ func (e entry) String() string { return fmt.Sprintf("%d: %s", e.pos, e.vals) } -func newEvalCmd(expr string, start, end time.Time, interval time.Duration) *evalCmd { +func newEvalCmd(expr string, start time.Time) *evalCmd { return &evalCmd{ - expr: expr, - start: start, - end: end, - interval: interval, - instant: start == end && interval == 0, + expr: expr, + start: start, metrics: map[uint64]labels.Labels{}, expected: map[uint64]entry{}, @@ -354,37 +349,9 @@ func (ev *evalCmd) expect(pos int, m labels.Labels, vals ...sequenceValue) { func (ev *evalCmd) compareResult(result Value) error { switch val := result.(type) { case Matrix: - if ev.instant { - return fmt.Errorf("received range result on instant evaluation") - } - seen := map[uint64]bool{} - for pos, v := range val { - fp := v.Metric.Hash() - if _, ok := ev.metrics[fp]; !ok { - return fmt.Errorf("unexpected metric %s in result", v.Metric) - } - exp := ev.expected[fp] - if ev.ordered && exp.pos != pos+1 { - return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1) - } - for i, expVal := range exp.vals { - if !almostEqual(expVal.value, v.Points[i].V) { - return fmt.Errorf("expected %v for %s but got %v", expVal, v.Metric, v.Points) - } - } - seen[fp] = true - } - for fp, expVals := range ev.expected { - if !seen[fp] { - return fmt.Errorf("expected metric %s with %v not found", ev.metrics[fp], expVals) - } - } + return fmt.Errorf("received range result on instant evaluation") case Vector: - if !ev.instant { - return fmt.Errorf("received instant result on range evaluation") - } - seen := map[uint64]bool{} for pos, v := range val { fp := v.Metric.Hash() @@ -464,8 +431,7 @@ func (t *Test) exec(tc testCommand) error { } case *evalCmd: - qry, _ := ParseExpr(cmd.expr) - q := t.queryEngine.newQuery(t.storage, qry, cmd.start, cmd.end, cmd.interval) + q, _ := t.queryEngine.NewInstantQuery(t.storage, cmd.expr, cmd.start) res := q.Exec(t.context) if res.Err != nil { if cmd.fail { @@ -473,6 +439,7 @@ func (t *Test) exec(tc testCommand) error { } return fmt.Errorf("error evaluating query %q: %s", cmd.expr, res.Err) } + defer q.Close() if res.Err == nil && cmd.fail { return fmt.Errorf("expected error evaluating query but got none") } @@ -482,6 +449,37 @@ func (t *Test) exec(tc testCommand) error { return fmt.Errorf("error in %s %s: %s", cmd, cmd.expr, err) } + // Check query returns same result in range mode, + /// by checking against the middle step. + q, _ = t.queryEngine.NewRangeQuery(t.storage, cmd.expr, cmd.start.Add(-time.Minute), cmd.start.Add(time.Minute), time.Minute) + rangeRes := q.Exec(t.context) + if rangeRes.Err != nil { + return fmt.Errorf("error evaluating query %q in range mode: %s", cmd.expr, rangeRes.Err) + } + defer q.Close() + if cmd.ordered { + // Ordering isn't defined for range queries. + return nil + } + mat := rangeRes.Value.(Matrix) + vec := make(Vector, 0, len(mat)) + for _, series := range mat { + for _, point := range series.Points { + if point.T == timeMilliseconds(cmd.start) { + vec = append(vec, Sample{Metric: series.Metric, Point: point}) + break + } + } + } + if _, ok := res.Value.(Scalar); ok { + err = cmd.compareResult(Scalar{V: vec[0].Point.V}) + } else { + err = cmd.compareResult(vec) + } + if err != nil { + return fmt.Errorf("error in %s %s rande mode: %s", cmd, cmd.expr, err) + } + default: panic("promql.Test.exec: unknown test command type") } diff --git a/promql/test_test.go b/promql/test_test.go deleted file mode 100644 index 5de250749f..0000000000 --- a/promql/test_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, softwar -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package promql - -// RunAsBenchmark runs the test in benchmark mode. -// This will not count any loads or non eval functions. -func (t *Test) RunAsBenchmark(b *Benchmark) error { - for _, cmd := range t.cmds { - - switch cmd.(type) { - // Only time the "eval" command. - case *evalCmd: - err := t.exec(cmd) - if err != nil { - return err - } - default: - if b.iterCount == 0 { - b.b.StopTimer() - err := t.exec(cmd) - if err != nil { - return err - } - b.b.StartTimer() - } - } - } - return nil -} diff --git a/promql/testdata/operators.test b/promql/testdata/operators.test index 1adb598c16..7bac547801 100644 --- a/promql/testdata/operators.test +++ b/promql/testdata/operators.test @@ -22,6 +22,19 @@ eval instant at 50m 2 - SUM(http_requests) BY (job) {job="api-server"} -998 {job="app-server"} -2598 +eval instant at 50m -http_requests{job="api-server",instance="0",group="production"} + {job="api-server",instance="0",group="production"} -100 + +eval instant at 50m +http_requests{job="api-server",instance="0",group="production"} + http_requests{job="api-server",instance="0",group="production"} 100 + +eval instant at 50m - - - SUM(http_requests) BY (job) + {job="api-server"} -1000 + {job="app-server"} -2600 + +eval instant at 50m - - - 1 + -1 + eval instant at 50m 1000 / SUM(http_requests) BY (job) {job="api-server"} 1 {job="app-server"} 0.38461538461538464 diff --git a/storage/buffer.go b/storage/buffer.go index 77476c6146..7df4027c55 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -30,23 +30,30 @@ type BufferedSeriesIterator struct { // of the current element and the duration of delta before. func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { bit := &BufferedSeriesIterator{ - it: it, - buf: newSampleRing(delta, 16), - lastTime: math.MinInt64, - ok: true, + buf: newSampleRing(delta, 16), } - it.Next() + bit.Reset(it) return bit } +// Reset re-uses the buffer with a new iterator. +func (b *BufferedSeriesIterator) Reset(it SeriesIterator) { + b.it = it + b.lastTime = math.MinInt64 + b.ok = true + b.buf.reset() + it.Next() +} + // PeekBack returns the nth previous element of the iterator. If there is none buffered, // ok is false. func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) { return b.buf.nthLast(n) } -// Buffer returns an iterator over the buffered data. +// Buffer returns an iterator over the buffered data. Invalidates previously +// returned iterators. func (b *BufferedSeriesIterator) Buffer() SeriesIterator { return b.buf.iterator() } @@ -118,6 +125,8 @@ type sampleRing struct { i int // position of most recent element in ring buffer f int // position of first element in ring buffer l int // number of elements in buffer + + it sampleRingIterator } func newSampleRing(delta int64, sz int) *sampleRing { @@ -133,8 +142,11 @@ func (r *sampleRing) reset() { r.f = 0 } +// Returns the current iterator. Invalidates previously retuned iterators. func (r *sampleRing) iterator() SeriesIterator { - return &sampleRingIterator{r: r, i: -1} + r.it.r = r + r.it.i = -1 + return &r.it } type sampleRingIterator struct { diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index 9c0c42b325..3fd593cb92 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -23,7 +23,6 @@ const ( ResultSortTime QueryPreparationTime InnerEvalTime - ResultAppendTime ExecQueueTime ExecTotalTime ) @@ -39,8 +38,6 @@ func (s QueryTiming) String() string { return "Query preparation time" case InnerEvalTime: return "Inner eval time" - case ResultAppendTime: - return "Result append time" case ExecQueueTime: return "Exec queue wait time" case ExecTotalTime: @@ -56,7 +53,6 @@ type queryTimings struct { ResultSortTime float64 `json:"resultSortTime"` QueryPreparationTime float64 `json:"queryPreparationTime"` InnerEvalTime float64 `json:"innerEvalTime"` - ResultAppendTime float64 `json:"resultAppendTime"` ExecQueueTime float64 `json:"execQueueTime"` ExecTotalTime float64 `json:"execTotalTime"` } @@ -81,8 +77,6 @@ func NewQueryStats(tg *TimerGroup) *QueryStats { qt.QueryPreparationTime = timer.Duration() case InnerEvalTime: qt.InnerEvalTime = timer.Duration() - case ResultAppendTime: - qt.ResultAppendTime = timer.Duration() case ExecQueueTime: qt.ExecQueueTime = timer.Duration() case ExecTotalTime: diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 503d668725..c9f5ebe025 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -105,7 +105,7 @@ func setCORS(w http.ResponseWriter) { } } -type apiFunc func(r *http.Request) (interface{}, *apiError) +type apiFunc func(r *http.Request) (interface{}, *apiError, func()) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. @@ -156,13 +156,17 @@ func (api *API) Register(r *route.Router) { wrap := func(f apiFunc) http.HandlerFunc { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { setCORS(w) - if data, err := f(r); err != nil { + data, err, finalizer := f(r) + if err != nil { respondError(w, err, data) } else if data != nil { respond(w, data) } else { w.WriteHeader(http.StatusNoContent) } + if finalizer != nil { + finalizer() + } }) return api.ready(httputil.CompressionHandler{ Handler: hf, @@ -200,17 +204,17 @@ type queryData struct { Stats *stats.QueryStats `json:"stats,omitempty"` } -func (api *API) options(r *http.Request) (interface{}, *apiError) { - return nil, nil +func (api *API) options(r *http.Request) (interface{}, *apiError, func()) { + return nil, nil, nil } -func (api *API) query(r *http.Request) (interface{}, *apiError) { +func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { var ts time.Time if t := r.FormValue("time"); t != "" { var err error ts, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { ts = api.now() @@ -221,7 +225,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -230,20 +234,20 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: - return nil, &apiError{errorCanceled, res.Err} + return nil, &apiError{errorCanceled, res.Err}, qry.Close case promql.ErrQueryTimeout: - return nil, &apiError{errorTimeout, res.Err} + return nil, &apiError{errorTimeout, res.Err}, qry.Close case promql.ErrStorage: - return nil, &apiError{errorInternal, res.Err} + return nil, &apiError{errorInternal, res.Err}, qry.Close } - return nil, &apiError{errorExec, res.Err} + return nil, &apiError{errorExec, res.Err}, qry.Close } // Optional stats field in response if parameter "stats" is not empty. @@ -256,38 +260,38 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil + }, nil, qry.Close } -func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { +func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { start, err := parseTime(r.FormValue("start")) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } end, err := parseTime(r.FormValue("end")) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } if end.Before(start) { err := errors.New("end timestamp must not be before start time") - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } step, err := parseDuration(r.FormValue("step")) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } if step <= 0 { err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. if end.Sub(start)/step > 11000 { err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } ctx := r.Context() @@ -295,7 +299,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -304,18 +308,18 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: - return nil, &apiError{errorCanceled, res.Err} + return nil, &apiError{errorCanceled, res.Err}, qry.Close case promql.ErrQueryTimeout: - return nil, &apiError{errorTimeout, res.Err} + return nil, &apiError{errorTimeout, res.Err}, qry.Close } - return nil, &apiError{errorExec, res.Err} + return nil, &apiError{errorExec, res.Err}, qry.Close } // Optional stats field in response if parameter "stats" is not empty. @@ -328,28 +332,28 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil + }, nil, qry.Close } -func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { +func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) { ctx := r.Context() name := route.Param(ctx, "name") if !model.LabelNameRE.MatchString(name) { - return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} + return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil } q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } defer q.Close() vals, err := q.LabelValues(name) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } - return vals, nil + return vals, nil, nil } var ( @@ -357,10 +361,10 @@ var ( maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999) ) -func (api *API) series(r *http.Request) (interface{}, *apiError) { +func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { r.ParseForm() if len(r.Form["match[]"]) == 0 { - return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} + return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil } var start time.Time @@ -368,7 +372,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var err error start, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { start = minTime @@ -379,7 +383,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var err error end, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { end = maxTime @@ -389,14 +393,14 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } matcherSets = append(matcherSets, matchers) } q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } defer q.Close() @@ -404,7 +408,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { for _, mset := range matcherSets { s, err := q.Select(nil, mset...) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } sets = append(sets, s) } @@ -415,14 +419,14 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { metrics = append(metrics, set.At().Labels()) } if set.Err() != nil { - return nil, &apiError{errorExec, set.Err()} + return nil, &apiError{errorExec, set.Err()}, nil } - return metrics, nil + return metrics, nil, nil } -func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { - return nil, &apiError{errorInternal, fmt.Errorf("not implemented")} +func (api *API) dropSeries(r *http.Request) (interface{}, *apiError, func()) { + return nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil } // Target has the information for one target. @@ -451,7 +455,7 @@ type TargetDiscovery struct { DroppedTargets []*DroppedTarget `json:"droppedTargets"` } -func (api *API) targets(r *http.Request) (interface{}, *apiError) { +func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) { tActive := api.targetRetriever.TargetsActive() tDropped := api.targetRetriever.TargetsDropped() res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))} @@ -479,7 +483,7 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) { DiscoveredLabels: t.DiscoveredLabels().Map(), } } - return res, nil + return res, nil, nil } // AlertmanagerDiscovery has all the active Alertmanagers. @@ -493,7 +497,7 @@ type AlertmanagerTarget struct { URL string `json:"url"` } -func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError) { +func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func()) { urls := api.alertmanagerRetriever.Alertmanagers() droppedURLS := api.alertmanagerRetriever.DroppedAlertmanagers() ams := &AlertmanagerDiscovery{ActiveAlertmanagers: make([]*AlertmanagerTarget, len(urls)), DroppedAlertmanagers: make([]*AlertmanagerTarget, len(droppedURLS))} @@ -503,22 +507,22 @@ func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError) { for i, url := range droppedURLS { ams.DroppedAlertmanagers[i] = &AlertmanagerTarget{URL: url.String()} } - return ams, nil + return ams, nil, nil } type prometheusConfig struct { YAML string `json:"yaml"` } -func (api *API) serveConfig(r *http.Request) (interface{}, *apiError) { +func (api *API) serveConfig(r *http.Request) (interface{}, *apiError, func()) { cfg := &prometheusConfig{ YAML: api.config().String(), } - return cfg, nil + return cfg, nil, nil } -func (api *API) serveFlags(r *http.Request) (interface{}, *apiError) { - return api.flagsMap, nil +func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) { + return api.flagsMap, nil, nil } func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { @@ -598,18 +602,18 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } -func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { +func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")} + return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")} + return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil } r.ParseForm() if len(r.Form["match[]"]) == 0 { - return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} + return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil } var start time.Time @@ -617,7 +621,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { var err error start, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { start = minTime @@ -628,7 +632,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { var err error end, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { end = maxTime @@ -637,7 +641,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } var selector tsdbLabels.Selector @@ -646,22 +650,22 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { } if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), selector...); err != nil { - return nil, &apiError{errorInternal, err} + return nil, &apiError{errorInternal, err}, nil } } - return nil, nil + return nil, nil, nil } -func (api *API) snapshot(r *http.Request) (interface{}, *apiError) { +func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")} + return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil } skipHead, _ := strconv.ParseBool(r.FormValue("skip_head")) db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")} + return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil } var ( @@ -672,31 +676,31 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError) { dir = filepath.Join(snapdir, name) ) if err := os.MkdirAll(dir, 0777); err != nil { - return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)} + return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil } if err := db.Snapshot(dir, !skipHead); err != nil { - return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)} + return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil } return struct { Name string `json:"name"` - }{name}, nil + }{name}, nil, nil } -func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError) { +func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError, func()) { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")} + return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")} + return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil } if err := db.CleanTombstones(); err != nil { - return nil, &apiError{errorInternal, err} + return nil, &apiError{errorInternal, err}, nil } - return nil, nil + return nil, nil, nil } func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index d935f12ed0..06e1a4ac63 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -530,7 +530,7 @@ func TestEndpoints(t *testing.T) { if err != nil { t.Fatal(err) } - resp, apiErr := test.endpoint(req.WithContext(ctx)) + resp, apiErr, _ := test.endpoint(req.WithContext(ctx)) if apiErr != nil { if test.errType == errorNone { t.Fatalf("Unexpected error: %s", apiErr)