From 71fe0c58a8814e9eb901effac6742d96bae21d15 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 28 Dec 2016 09:16:48 +0100 Subject: [PATCH] promql: misc fixes --- pkg/labels/labels.go | 4 +- promql/ast.go | 2 +- promql/engine.go | 242 +++++++++------------------------------ promql/functions.go | 140 +++++++++++++--------- promql/parse.go | 20 ++-- promql/parse_test.go | 106 ++++++++--------- promql/printer.go | 8 +- promql/test.go | 29 ++--- promql/value.go | 169 +++++++++++++++++++++++++++ storage/tsdb/tsdb.go | 12 +- util/testutil/storage.go | 6 + 11 files changed, 394 insertions(+), 344 deletions(-) create mode 100644 promql/value.go diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 5795fb3dc5..03e8a94f66 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -205,7 +205,9 @@ func (b *Builder) Labels() Labels { return b.base } - res := make(Labels, 0, len(b.base)+len(b.add)-len(b.del)) + // In the general case, labels are removed, modified or moved + // rather than added. + res := make(Labels, 0, len(b.base)) Outer: for _, l := range b.base { for _, n := range b.del { diff --git a/promql/ast.go b/promql/ast.go index 65c57221d0..44e2fdd328 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -184,7 +184,7 @@ func (e *BinaryExpr) Type() ValueType { if e.LHS.Type() == ValueTypeScalar && e.RHS.Type() == ValueTypeScalar { return ValueTypeScalar } - return ValueTypeScalar + return ValueTypeVector } func (*AggregateExpr) expr() {} diff --git a/promql/engine.go b/promql/engine.go index 5cc1f84f7b..86a681ccfe 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -19,11 +19,12 @@ import ( "math" "runtime" "sort" - "strings" + "strconv" "time" "github.com/prometheus/common/log" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" "golang.org/x/net/context" @@ -42,163 +43,6 @@ func convertibleToInt64(v float64) bool { return v <= maxInt64 && v >= minInt64 } -// Value is a generic interface for values resulting from a query evaluation. -type Value interface { - Type() ValueType - String() string -} - -func (Matrix) Type() ValueType { return ValueTypeMatrix } -func (Vector) Type() ValueType { return ValueTypeVector } -func (Scalar) Type() ValueType { return ValueTypeScalar } -func (String) Type() ValueType { return ValueTypeString } - -// ValueType describes a type of a value. -type ValueType string - -// The valid value types. -const ( - ValueTypeNone = "none" - ValueTypeVector = "vector" - ValueTypeScalar = "scalar" - ValueTypeMatrix = "matrix" - ValueTypeString = "string" -) - -// String represents a string value. -type String struct { - V string - T int64 -} - -func (s String) String() string { - return s.V -} - -// Scalar is a data point that's explicitly not associated with a metric. -type Scalar struct { - T int64 - V float64 -} - -func (s Scalar) String() string { - return "" -} - -// Series is a stream of data points belonging to a metric. -type Series struct { - Metric labels.Labels - Points []Point -} - -func (s Series) String() string { - return "" -} - -// Point represents a single data point for a given timestamp. -type Point struct { - T int64 - V float64 -} - -func (s Point) String() string { - return "" -} - -// Sample is a single sample belonging to a metric. -type Sample struct { - Point - - Metric labels.Labels -} - -func (s Sample) String() string { - return "" -} - -// Vector is basically only an alias for model.Samples, but the -// contract is that in a Vector, all Samples have the same timestamp. -type Vector []Sample - -func (vec Vector) String() string { - entries := make([]string, len(vec)) - for i, s := range vec { - entries[i] = s.String() - } - return strings.Join(entries, "\n") -} - -// Matrix is a slice of Seriess that implements sort.Interface and -// has a String method. -type Matrix []Series - -func (m Matrix) String() string { - // TODO(fabxc): sort, or can we rely on order from the querier? - strs := make([]string, len(m)) - - for i, ss := range m { - strs[i] = ss.String() - } - - return strings.Join(strs, "\n") -} - -// Result holds the resulting value of an execution or an error -// if any occurred. -type Result struct { - Err error - Value Value -} - -// Vector returns a Vector if the result value is one. An error is returned if -// the result was an error or the result value is not a Vector. -func (r *Result) Vector() (Vector, error) { - if r.Err != nil { - return nil, r.Err - } - v, ok := r.Value.(Vector) - if !ok { - return nil, fmt.Errorf("query result is not a Vector") - } - return v, nil -} - -// Matrix returns a Matrix. An error is returned if -// the result was an error or the result value is not a Matrix. -func (r *Result) Matrix() (Matrix, error) { - if r.Err != nil { - return nil, r.Err - } - v, ok := r.Value.(Matrix) - if !ok { - return nil, fmt.Errorf("query result is not a range Vector") - } - return v, nil -} - -// Scalar returns a Scalar value. An error is returned if -// the result was an error or the result value is not a Scalar. -func (r *Result) Scalar() (Scalar, error) { - if r.Err != nil { - return Scalar{}, r.Err - } - v, ok := r.Value.(Scalar) - if !ok { - return Scalar{}, fmt.Errorf("query result is not a Scalar") - } - return v, nil -} - -func (r *Result) String() string { - if r.Err != nil { - return r.Err.Error() - } - if r.Value == nil { - return "" - } - return r.Value.String() -} - type ( // ErrQueryTimeout is returned if a query timed out during processing. ErrQueryTimeout string @@ -531,11 +375,17 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: - if n.Offset > maxOffset { + if maxOffset < StalenessDelta { + maxOffset = StalenessDelta + } + if n.Offset+StalenessDelta > maxOffset { maxOffset = n.Offset + StalenessDelta } case *MatrixSelector: - if n.Offset > maxOffset { + if maxOffset < n.Range { + maxOffset = n.Range + } + if n.Offset+n.Range > maxOffset { maxOffset = n.Offset + n.Range } } @@ -544,7 +394,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q mint := s.Start.Add(-maxOffset) - querier, err := ng.queryable.Querier(timeMilliseconds(mint), timeMilliseconds(s.End)) + querier, err := ng.queryable.Querier(timestamp.FromTime(mint), timestamp.FromTime(s.End)) if err != nil { return nil, err } @@ -554,6 +404,8 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q case *VectorSelector: n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) if err != nil { + // TODO(fabxc): use multi-error. + log.Errorln("expand series set:", err) return false } for _, s := range n.series { @@ -564,6 +416,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q case *MatrixSelector: n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...)) if err != nil { + log.Errorln("expand series set:", err) return false } for _, s := range n.series { @@ -736,7 +589,7 @@ func (ev *evaluator) eval(expr Expr) Value { return e.Func.Call(ev, e.Args) case *MatrixSelector: - return ev.MatrixSelector(e) + return ev.matrixSelector(e) case *NumberLiteral: return Scalar{V: e.Val, T: ev.Timestamp} @@ -763,35 +616,33 @@ func (ev *evaluator) eval(expr Expr) Value { return se case *VectorSelector: - return ev.VectorSelector(e) + return ev.vectorSelector(e) } panic(fmt.Errorf("unhandled expression of type: %T", expr)) } -// VectorSelector evaluates a *VectorSelector expression. -func (ev *evaluator) VectorSelector(node *VectorSelector) Vector { +// vectorSelector evaluates a *VectorSelector expression. +func (ev *evaluator) vectorSelector(node *VectorSelector) Vector { var ( - ok bool vec = make(Vector, 0, len(node.series)) refTime = ev.Timestamp - durationMilliseconds(node.Offset) ) for i, it := range node.iterators { - if !it.Seek(refTime) { + ok := it.Seek(refTime) + if !ok { if it.Err() != nil { ev.error(it.Err()) } - continue } t, v := it.Values() - if t > refTime { + if !ok || t > refTime { t, v, ok = it.PeekBack() if !ok || t < refTime-durationMilliseconds(StalenessDelta) { continue } } - vec = append(vec, Sample{ Metric: node.series[i].Labels(), Point: Point{V: v, T: ev.Timestamp}, @@ -800,8 +651,8 @@ func (ev *evaluator) VectorSelector(node *VectorSelector) Vector { return vec } -// MatrixSelector evaluates a *MatrixSelector expression. -func (ev *evaluator) MatrixSelector(node *MatrixSelector) Matrix { +// matrixSelector evaluates a *MatrixSelector expression. +func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var ( offset = durationMilliseconds(node.Offset) maxt = ev.Timestamp - offset @@ -815,28 +666,30 @@ func (ev *evaluator) MatrixSelector(node *MatrixSelector) Matrix { Points: make([]Point, 0, 16), } - if !it.Seek(maxt) { + ok := it.Seek(maxt) + if !ok { if it.Err() != nil { ev.error(it.Err()) } - continue } + t, v := it.Values() buf := it.Buffer() for buf.Next() { t, v := buf.Values() // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { - ss.Points = append(ss.Points, Point{T: t + offset, V: v}) + ss.Points = append(ss.Points, Point{T: t, V: v}) } } // The seeked sample might also be in the range. - t, v := it.Values() + t, v = it.Values() if t == maxt { - ss.Points = append(ss.Points, Point{T: t + offset, V: v}) + ss.Points = append(ss.Points, Point{T: t, V: v}) + } + if len(ss.Points) > 0 { + Matrix = append(Matrix, ss) } - - Matrix = append(Matrix, ss) } return Matrix } @@ -999,7 +852,7 @@ func (ev *evaluator) VectorBinop(op itemType, lhs, rhs Vector, matching *VectorM } func hashWithoutLabels(lset labels.Labels, names ...string) uint64 { - cm := make(labels.Labels, 0, len(lset)-len(names)-1) + cm := make(labels.Labels, 0, len(lset)) Outer: for _, l := range lset { @@ -1226,26 +1079,36 @@ func (ev *evaluator) aggregation(op itemType, grouping []string, without bool, k for _, s := range vec { lb := labels.NewBuilder(s.Metric) - if without || keepCommon { + if without { + lb.Del(grouping...) lb.Del(labels.MetricName) } if op == itemCountValues { - lb.Set(valueLabel, fmt.Sprintf("%f", s.V)) // TODO(fabxc): use correct printing. + lb.Set(valueLabel, strconv.FormatFloat(float64(s.V), 'f', -1, 64)) } var ( + groupingKey uint64 metric = lb.Labels() - groupingKey = metric.Hash() ) + if without { + groupingKey = metric.Hash() + } else { + groupingKey = hashForLabels(metric, grouping...) + } + group, ok := result[groupingKey] // Add a new group if it doesn't exist. if !ok { var m labels.Labels - if keepCommon || without { + + if keepCommon { + m = lb.Del(labels.MetricName).Labels() + } else if without { m = metric } else { m = make(labels.Labels, 0, len(grouping)) - for _, l := range s.Metric { + for _, l := range metric { for _, n := range grouping { if l.Name == n { m = append(m, labels.Label{Name: n, Value: l.Value}) @@ -1253,6 +1116,7 @@ func (ev *evaluator) aggregation(op itemType, grouping []string, without bool, k } } } + sort.Sort(m) } result[groupingKey] = &groupedAggregation{ labels: m, @@ -1451,10 +1315,10 @@ func (g *queryGate) Done() { // user facing terminology as defined in the documentation. func documentedType(t ValueType) string { switch t { - case "Vector": - return "instant Vector" - case "Matrix": - return "range Vector" + case "vector": + return "instant vector" + case "matrix": + return "range vector" default: return string(t) } diff --git a/promql/functions.go b/promql/functions.go index 0b57f51475..6aca50601c 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -49,13 +49,14 @@ func funcTime(ev *evaluator, args Expressions) Value { func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Value { ms := arg.(*MatrixSelector) - rangeStart := ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset) - rangeEnd := ev.Timestamp - durationMilliseconds(ms.Offset) + var ( + matrix = ev.evalMatrix(ms) + rangeStart = ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset) + rangeEnd = ev.Timestamp - durationMilliseconds(ms.Offset) + resultVector = make(Vector, 0, len(matrix)) + ) - resultVector := Vector{} - - MatrixValue := ev.evalMatrix(ms) - for _, samples := range MatrixValue { + for _, samples := range matrix { // No sense in trying to compute a rate without at least two points. Drop // this Vector element. if len(samples.Points) < 2 { @@ -74,11 +75,11 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu resultValue := lastValue - samples.Points[0].V + counterCorrection // Duration between first/last samples and boundary of range. - durationToStart := float64(samples.Points[0].T - rangeStart) - durationToEnd := float64(rangeEnd - samples.Points[len(samples.Points)-1].T) + durationToStart := float64(samples.Points[0].T-rangeStart) / 1000 + durationToEnd := float64(rangeEnd-samples.Points[len(samples.Points)-1].T) / 1000 - sampledInterval := float64(samples.Points[len(samples.Points)-1].T - samples.Points[0].T) - averageDurationBetweenSamples := float64(sampledInterval) / float64(len(samples.Points)-1) + sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000 + averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1) if isCounter && resultValue > 0 && samples.Points[0].V >= 0 { // Counters cannot be negative. If we have any slope at @@ -88,7 +89,7 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu // take the zero point as the start of the series, // thereby avoiding extrapolation to negative counter // values. - durationToZero := float64(sampledInterval) * float64(samples.Points[0].V/resultValue) + durationToZero := sampledInterval * (samples.Points[0].V / resultValue) if durationToZero < durationToStart { durationToStart = durationToZero } @@ -111,9 +112,9 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu } else { extrapolateToInterval += averageDurationBetweenSamples / 2 } - resultValue = resultValue * extrapolateToInterval / sampledInterval + resultValue = resultValue * (extrapolateToInterval / sampledInterval) if isRate { - resultValue = resultValue / 1000 / ms.Range.Seconds() + resultValue = resultValue / ms.Range.Seconds() } resultVector = append(resultVector, Sample{ @@ -303,7 +304,9 @@ func funcSortDesc(ev *evaluator, args Expressions) Value { func funcClampMax(ev *evaluator, args Expressions) Value { vec := ev.evalVector(args[0]) max := ev.evalFloat(args[1]) - for _, el := range vec { + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Min(max, float64(el.V)) } @@ -314,7 +317,9 @@ func funcClampMax(ev *evaluator, args Expressions) Value { func funcClampMin(ev *evaluator, args Expressions) Value { vec := ev.evalVector(args[0]) min := ev.evalFloat(args[1]) - for _, el := range vec { + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Max(min, float64(el.V)) } @@ -356,7 +361,8 @@ func funcDropCommonLabels(ev *evaluator, args Expressions) Value { cnames = append(cnames, n) } - for _, el := range vec { + for i := range vec { + el := &vec[i] el.Metric = labels.NewBuilder(el.Metric).Del(cnames...).Labels() } return vec @@ -374,7 +380,9 @@ func funcRound(ev *evaluator, args Expressions) Value { toNearestInverse := 1.0 / toNearest vec := ev.evalVector(args[0]) - for _, el := range vec { + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Floor(float64(el.V)*toNearestInverse+0.5) / toNearestInverse } @@ -396,7 +404,7 @@ func funcScalar(ev *evaluator, args Expressions) Value { } } -// === count_Scalar(Vector ValueTypeVector) float64 === +// === count_scalar(Vector ValueTypeVector) float64 === func funcCountScalar(ev *evaluator, args Expressions) Value { return Scalar{ V: float64(len(ev.evalVector(args[0]))), @@ -441,12 +449,14 @@ func funcCountOverTime(ev *evaluator, args Expressions) Value { // === floor(Vector ValueTypeVector) Vector === func funcFloor(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Floor(float64(el.V)) } - return Vector + return vec } // === max_over_time(Matrix ValueTypeMatrix) Vector === @@ -536,12 +546,14 @@ func funcStdvarOverTime(ev *evaluator, args Expressions) Value { // === abs(Vector ValueTypeVector) Vector === func funcAbs(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Abs(float64(el.V)) } - return Vector + return vec } // === absent(Vector ValueTypeVector) Vector === @@ -568,62 +580,74 @@ func funcAbsent(ev *evaluator, args Expressions) Value { // === ceil(Vector ValueTypeVector) Vector === func funcCeil(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Ceil(float64(el.V)) } - return Vector + return vec } // === exp(Vector ValueTypeVector) Vector === func funcExp(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Exp(float64(el.V)) } - return Vector + return vec } // === sqrt(Vector VectorNode) Vector === func funcSqrt(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Sqrt(float64(el.V)) } - return Vector + return vec } // === ln(Vector ValueTypeVector) Vector === func funcLn(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Log(float64(el.V)) } - return Vector + return vec } // === log2(Vector ValueTypeVector) Vector === func funcLog2(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Log2(float64(el.V)) } - return Vector + return vec } // === log10(Vector ValueTypeVector) Vector === func funcLog10(ev *evaluator, args Expressions) Value { - Vector := ev.evalVector(args[0]) - for _, el := range Vector { + vec := ev.evalVector(args[0]) + for i := range vec { + el := &vec[i] + el.Metric = dropMetricName(el.Metric) el.V = math.Log10(float64(el.V)) } - return Vector + return vec } // linearRegression performs a least-square linear regression analysis on the @@ -636,7 +660,7 @@ func linearRegression(samples []Point, interceptTime int64) (slope, intercept fl sumXY, sumX2 float64 ) for _, sample := range samples { - x := float64(sample.T-interceptTime) / 1e6 + x := float64(sample.T-interceptTime) / 1e3 n += 1.0 sumY += sample.V sumX += x @@ -786,7 +810,7 @@ func funcChanges(ev *evaluator, args Expressions) Value { // === label_replace(Vector ValueTypeVector, dst_label, replacement, src_labelname, regex ValueTypeString) Vector === func funcLabelReplace(ev *evaluator, args Expressions) Value { var ( - Vector = ev.evalVector(args[0]) + vector = ev.evalVector(args[0]) dst = ev.evalString(args[1]).V repl = ev.evalString(args[2]).V src = ev.evalString(args[3]).V @@ -801,8 +825,10 @@ func funcLabelReplace(ev *evaluator, args Expressions) Value { ev.errorf("invalid destination label name in label_replace(): %s", dst) } - outSet := make(map[uint64]struct{}, len(Vector)) - for _, el := range Vector { + outSet := make(map[uint64]struct{}, len(vector)) + for i := range vector { + el := &vector[i] + srcVal := el.Metric.Get(src) indexes := regex.FindStringSubmatchIndex(srcVal) // If there is no match, no replacement should take place. @@ -825,7 +851,7 @@ func funcLabelReplace(ev *evaluator, args Expressions) Value { } } - return Vector + return vector } // === Vector(s Scalar) Vector === @@ -851,7 +877,9 @@ func dateWrapper(ev *evaluator, args Expressions, f func(time.Time) float64) Val } else { v = ev.evalVector(args[0]) } - for _, el := range v { + for i := range v { + el := &v[i] + el.Metric = dropMetricName(el.Metric) t := time.Unix(int64(el.V), 0).UTC() el.V = f(t) @@ -957,8 +985,8 @@ var functions = map[string]*Function{ ReturnType: ValueTypeVector, Call: funcCountOverTime, }, - "count_Scalar": { - Name: "count_Scalar", + "count_scalar": { + Name: "count_scalar", ArgTypes: []ValueType{ValueTypeVector}, ReturnType: ValueTypeScalar, Call: funcCountScalar, @@ -1132,8 +1160,8 @@ var functions = map[string]*Function{ ReturnType: ValueTypeVector, Call: funcRound, }, - "Scalar": { - Name: "Scalar", + "scalar": { + Name: "scalar", ArgTypes: []ValueType{ValueTypeVector}, ReturnType: ValueTypeScalar, Call: funcScalar, @@ -1180,8 +1208,8 @@ var functions = map[string]*Function{ ReturnType: ValueTypeScalar, Call: funcTime, }, - "Vector": { - Name: "Vector", + "vector": { + Name: "vector", ArgTypes: []ValueType{ValueTypeScalar}, ReturnType: ValueTypeVector, Call: funcVector, @@ -1219,7 +1247,7 @@ func (s vectorByValueHeap) Swap(i, j int) { } func (s *vectorByValueHeap) Push(x interface{}) { - *s = append(*s, x.(Sample)) + *s = append(*s, *(x.(*Sample))) } func (s *vectorByValueHeap) Pop() interface{} { @@ -1248,7 +1276,7 @@ func (s vectorByReverseValueHeap) Swap(i, j int) { } func (s *vectorByReverseValueHeap) Push(x interface{}) { - *s = append(*s, x.(Sample)) + *s = append(*s, *(x.(*Sample))) } func (s *vectorByReverseValueHeap) Pop() interface{} { diff --git a/promql/parse.go b/promql/parse.go index 0538426311..cd783f3272 100644 --- a/promql/parse.go +++ b/promql/parse.go @@ -506,7 +506,7 @@ func (p *parser) balance(lhs Expr, op itemType, rhs Expr, vecMatching *VectorMat if (precd < 0) || (precd == 0 && op.isRightAssociative()) { balanced := p.balance(lhsBE.RHS, op, rhs, vecMatching, returnBool) if lhsBE.Op.isComparisonOperator() && !lhsBE.ReturnBool && balanced.Type() == ValueTypeScalar && lhsBE.LHS.Type() == ValueTypeScalar { - p.errorf("comparisons between Scalars must use BOOL modifier") + p.errorf("comparisons between scalars must use BOOL modifier") } return &BinaryExpr{ Op: lhsBE.Op, @@ -518,7 +518,7 @@ func (p *parser) balance(lhs Expr, op itemType, rhs Expr, vecMatching *VectorMat } } if op.isComparisonOperator() && !returnBool && rhs.Type() == ValueTypeScalar && lhs.Type() == ValueTypeScalar { - p.errorf("comparisons between Scalars must use BOOL modifier") + p.errorf("comparisons between scalars must use BOOL modifier") } return &BinaryExpr{ Op: op, @@ -935,7 +935,7 @@ func (p *parser) offset() time.Duration { return offset } -// VectorSelector parses a new (instant) Vector selector. +// VectorSelector parses a new (instant) vector selector. // // [] // [] @@ -962,7 +962,7 @@ func (p *parser) VectorSelector(name string) *VectorSelector { } if len(matchers) == 0 { - p.errorf("Vector selector must contain label matchers or metric name") + p.errorf("vector selector must contain label matchers or metric name") } // A Vector selector must contain at least one non-empty matcher to prevent // implicit selection of all metrics (e.g. by a typo). @@ -974,7 +974,7 @@ func (p *parser) VectorSelector(name string) *VectorSelector { } } if !notEmpty { - p.errorf("Vector selector must contain at least one non-empty matcher") + p.errorf("vector selector must contain at least one non-empty matcher") } return &VectorSelector{ @@ -1028,7 +1028,7 @@ func (p *parser) checkType(node Node) (typ ValueType) { case *RecordStmt: ty := p.checkType(n.Expr) if ty != ValueTypeVector && ty != ValueTypeScalar { - p.errorf("record statement must have a valid expression of type instant Vector or Scalar but got %s", documentedType(ty)) + p.errorf("record statement must have a valid expression of type instant vector or scalar but got %s", documentedType(ty)) } case Expressions: @@ -1058,12 +1058,12 @@ func (p *parser) checkType(node Node) (typ ValueType) { p.errorf("binary expression does not support operator %q", n.Op) } if (lt != ValueTypeScalar && lt != ValueTypeVector) || (rt != ValueTypeScalar && rt != ValueTypeVector) { - p.errorf("binary expression must contain only Scalar and instant Vector types") + p.errorf("binary expression must contain only scalar and instant vector types") } if (lt != ValueTypeVector || rt != ValueTypeVector) && n.VectorMatching != nil { if len(n.VectorMatching.MatchingLabels) > 0 { - p.errorf("Vector matching only allowed between instant Vectors") + p.errorf("vector matching only allowed between instant vectors") } n.VectorMatching = nil } else { @@ -1079,7 +1079,7 @@ func (p *parser) checkType(node Node) (typ ValueType) { } if (lt == ValueTypeScalar || rt == ValueTypeScalar) && n.Op.isSetOperator() { - p.errorf("set operator %q not allowed in binary Scalar expression", n.Op) + p.errorf("set operator %q not allowed in binary scalar expression", n.Op) } case *Call: @@ -1102,7 +1102,7 @@ func (p *parser) checkType(node Node) (typ ValueType) { p.errorf("only + and - operators allowed for unary expressions") } if t := p.checkType(n.Expr); t != ValueTypeScalar && t != ValueTypeVector { - p.errorf("unary expression only allowed on expressions of type Scalar or instant Vector, got %q", documentedType(t)) + p.errorf("unary expression only allowed on expressions of type scalar or instant vector, got %q", documentedType(t)) } case *NumberLiteral, *MatrixSelector, *StringLiteral, *VectorSelector: diff --git a/promql/parse_test.go b/promql/parse_test.go index a09f952655..10b3258785 100644 --- a/promql/parse_test.go +++ b/promql/parse_test.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" ) var testExpr = []struct { @@ -31,7 +32,7 @@ var testExpr = []struct { fail bool // Whether parsing is supposed to fail. errMsg string // If not empty the parsing error has to contain this string. }{ - // Scalars and Scalar-to-Scalar operations. + // Scalars and scalar-to-scalar operations. { input: "1", expected: &NumberLiteral{1}, @@ -212,19 +213,19 @@ var testExpr = []struct { }, { input: "1 and 1", fail: true, - errMsg: "set operator \"and\" not allowed in binary Scalar expression", + errMsg: "set operator \"and\" not allowed in binary scalar expression", }, { input: "1 == 1", fail: true, - errMsg: "parse error at char 7: comparisons between Scalars must use BOOL modifier", + errMsg: "parse error at char 7: comparisons between scalars must use BOOL modifier", }, { input: "1 or 1", fail: true, - errMsg: "set operator \"or\" not allowed in binary Scalar expression", + errMsg: "set operator \"or\" not allowed in binary scalar expression", }, { input: "1 unless 1", fail: true, - errMsg: "set operator \"unless\" not allowed in binary Scalar expression", + errMsg: "set operator \"unless\" not allowed in binary scalar expression", }, { input: "1 !~ 1", fail: true, @@ -236,11 +237,11 @@ var testExpr = []struct { }, { input: `-"string"`, fail: true, - errMsg: `unary expression only allowed on expressions of type Scalar or instant Vector, got "string"`, + errMsg: `unary expression only allowed on expressions of type scalar or instant vector, got "string"`, }, { input: `-test[5m]`, fail: true, - errMsg: `unary expression only allowed on expressions of type Scalar or instant Vector, got "range Vector"`, + errMsg: `unary expression only allowed on expressions of type scalar or instant vector, got "range vector"`, }, { input: `*test`, fail: true, @@ -747,35 +748,35 @@ var testExpr = []struct { }, { input: "foo and 1", fail: true, - errMsg: "set operator \"and\" not allowed in binary Scalar expression", + errMsg: "set operator \"and\" not allowed in binary scalar expression", }, { input: "1 and foo", fail: true, - errMsg: "set operator \"and\" not allowed in binary Scalar expression", + errMsg: "set operator \"and\" not allowed in binary scalar expression", }, { input: "foo or 1", fail: true, - errMsg: "set operator \"or\" not allowed in binary Scalar expression", + errMsg: "set operator \"or\" not allowed in binary scalar expression", }, { input: "1 or foo", fail: true, - errMsg: "set operator \"or\" not allowed in binary Scalar expression", + errMsg: "set operator \"or\" not allowed in binary scalar expression", }, { input: "foo unless 1", fail: true, - errMsg: "set operator \"unless\" not allowed in binary Scalar expression", + errMsg: "set operator \"unless\" not allowed in binary scalar expression", }, { input: "1 unless foo", fail: true, - errMsg: "set operator \"unless\" not allowed in binary Scalar expression", + errMsg: "set operator \"unless\" not allowed in binary scalar expression", }, { input: "1 or on(bar) foo", fail: true, - errMsg: "Vector matching only allowed between instant Vectors", + errMsg: "vector matching only allowed between instant vectors", }, { input: "foo == on(bar) 10", fail: true, - errMsg: "Vector matching only allowed between instant Vectors", + errMsg: "vector matching only allowed between instant vectors", }, { input: "foo and on(bar) group_left(baz) bar", fail: true, @@ -914,23 +915,23 @@ var testExpr = []struct { }, { input: `{}`, fail: true, - errMsg: "Vector selector must contain label matchers or metric name", + errMsg: "vector selector must contain label matchers or metric name", }, { input: `{x=""}`, fail: true, - errMsg: "Vector selector must contain at least one non-empty matcher", + errMsg: "vector selector must contain at least one non-empty matcher", }, { input: `{x=~".*"}`, fail: true, - errMsg: "Vector selector must contain at least one non-empty matcher", + errMsg: "vector selector must contain at least one non-empty matcher", }, { input: `{x!~".+"}`, fail: true, - errMsg: "Vector selector must contain at least one non-empty matcher", + errMsg: "vector selector must contain at least one non-empty matcher", }, { input: `{x!="a"}`, fail: true, - errMsg: "Vector selector must contain at least one non-empty matcher", + errMsg: "vector selector must contain at least one non-empty matcher", }, { input: `foo{__name__="bar"}`, fail: true, @@ -1285,11 +1286,11 @@ var testExpr = []struct { }, { input: `topk(some_metric, other_metric)`, fail: true, - errMsg: "parse error at char 32: expected type Scalar in aggregation parameter, got instant Vector", + errMsg: "parse error at char 32: expected type scalar in aggregation parameter, got instant vector", }, { input: `count_values(5, other_metric)`, fail: true, - errMsg: "parse error at char 30: expected type string in aggregation parameter, got Scalar", + errMsg: "parse error at char 30: expected type string in aggregation parameter, got scalar", }, // Test function calls. { @@ -1363,7 +1364,7 @@ var testExpr = []struct { }, { input: "floor(1)", fail: true, - errMsg: "expected type instant Vector in call to function \"floor\", got Scalar", + errMsg: "expected type instant vector in call to function \"floor\", got scalar", }, { input: "non_existent_function_far_bar()", fail: true, @@ -1371,7 +1372,7 @@ var testExpr = []struct { }, { input: "rate(some_metric)", fail: true, - errMsg: "expected type range Vector in call to function \"rate\", got instant Vector", + errMsg: "expected type range vector in call to function \"rate\", got instant vector", }, // Fuzzing regression tests. { @@ -1532,7 +1533,7 @@ var testStatement = []struct { summary = "Global request rate low", description = "The global request rate is low" } - + foo = bar{label1="value1"} ALERT BazAlert IF foo > 10 @@ -1591,7 +1592,6 @@ var testStatement = []struct { mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar"), }, }, - Labels: nil, }, &AlertStmt{ Name: "BazAlert", @@ -1605,7 +1605,6 @@ var testStatement = []struct { }, RHS: &NumberLiteral{10}, }, - Labels: labels.Labels{}, Annotations: labels.FromStrings( "summary", "Baz", "description", "BazAlert", @@ -1782,59 +1781,41 @@ func mustGetFunction(name string) *Function { var testSeries = []struct { input string - expectedMetric model.Metric + expectedMetric labels.Labels expectedValues []sequenceValue fail bool }{ { input: `{} 1 2 3`, - expectedMetric: model.Metric{}, + expectedMetric: labels.Labels{}, expectedValues: newSeq(1, 2, 3), }, { - input: `{a="b"} -1 2 3`, - expectedMetric: model.Metric{ - "a": "b", - }, + input: `{a="b"} -1 2 3`, + expectedMetric: labels.FromStrings("a", "b"), expectedValues: newSeq(-1, 2, 3), }, { - input: `my_metric 1 2 3`, - expectedMetric: model.Metric{ - model.MetricNameLabel: "my_metric", - }, + input: `my_metric 1 2 3`, + expectedMetric: labels.FromStrings(labels.MetricName, "my_metric"), expectedValues: newSeq(1, 2, 3), }, { - input: `my_metric{} 1 2 3`, - expectedMetric: model.Metric{ - model.MetricNameLabel: "my_metric", - }, + input: `my_metric{} 1 2 3`, + expectedMetric: labels.FromStrings(labels.MetricName, "my_metric"), expectedValues: newSeq(1, 2, 3), }, { - input: `my_metric{a="b"} 1 2 3`, - expectedMetric: model.Metric{ - model.MetricNameLabel: "my_metric", - "a": "b", - }, + input: `my_metric{a="b"} 1 2 3`, + expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"), expectedValues: newSeq(1, 2, 3), }, { - input: `my_metric{a="b"} 1 2 3-10x4`, - expectedMetric: model.Metric{ - model.MetricNameLabel: "my_metric", - "a": "b", - }, + input: `my_metric{a="b"} 1 2 3-10x4`, + expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"), expectedValues: newSeq(1, 2, 3, -7, -17, -27, -37), }, { - input: `my_metric{a="b"} 1 2 3-0x4`, - expectedMetric: model.Metric{ - model.MetricNameLabel: "my_metric", - "a": "b", - }, + input: `my_metric{a="b"} 1 2 3-0x4`, + expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"), expectedValues: newSeq(1, 2, 3, 3, 3, 3, 3), }, { - input: `my_metric{a="b"} 1 3 _ 5 _x4`, - expectedMetric: model.Metric{ - model.MetricNameLabel: "my_metric", - "a": "b", - }, + input: `my_metric{a="b"} 1 3 _ 5 _x4`, + expectedMetric: labels.FromStrings(labels.MetricName, "my_metric", "a", "b"), expectedValues: newSeq(1, 3, none, 5, none, none, none, none), }, { input: `my_metric{a="b"} 1 3 _ 5 _a4`, @@ -1884,6 +1865,9 @@ func TestParseSeries(t *testing.T) { t.Fatalf("failure expected, but passed") } + require.Equal(t, test.expectedMetric, metric) + require.Equal(t, test.expectedValues, vals) + if !reflect.DeepEqual(vals, test.expectedValues) || !reflect.DeepEqual(metric, test.expectedMetric) { t.Errorf("error in input: \n\n%s\n", test.input) t.Fatalf("no match\n\nexpected:\n%s %s\ngot: \n%s %s\n", test.expectedMetric, test.expectedValues, metric, vals) diff --git a/promql/printer.go b/promql/printer.go index 0b7d605451..6140a25312 100644 --- a/promql/printer.go +++ b/promql/printer.go @@ -146,7 +146,7 @@ func (node *AggregateExpr) String() string { } else { format = "%s BY (%s)" } - aggrString = fmt.Sprintf(format, aggrString, node.Grouping) + aggrString = fmt.Sprintf(format, aggrString, strings.Join(node.Grouping, ", ")) } if node.KeepCommonLabels { aggrString += " KEEP_COMMON" @@ -164,9 +164,9 @@ func (node *BinaryExpr) String() string { vm := node.VectorMatching if vm != nil && (len(vm.MatchingLabels) > 0 || vm.On) { if vm.On { - matching = fmt.Sprintf(" ON(%s)", vm.MatchingLabels) + matching = fmt.Sprintf(" ON(%s)", strings.Join(vm.MatchingLabels, ", ")) } else { - matching = fmt.Sprintf(" IGNORING(%s)", vm.MatchingLabels) + matching = fmt.Sprintf(" IGNORING(%s)", strings.Join(vm.MatchingLabels, ", ")) } if vm.Card == CardManyToOne || vm.Card == CardOneToMany { matching += " GROUP_" @@ -175,7 +175,7 @@ func (node *BinaryExpr) String() string { } else { matching += "RIGHT" } - matching += fmt.Sprintf("(%s)", vm.Include) + matching += fmt.Sprintf("(%s)", strings.Join(vm.Include, ", ")) } } return fmt.Sprintf("%s %s%s%s %s", node.LHS, node.Op, returnBool, matching, node.RHS) diff --git a/promql/test.go b/promql/test.go index 612712fc9b..7b3bbe9884 100644 --- a/promql/test.go +++ b/promql/test.go @@ -42,7 +42,7 @@ const ( epsilon = 0.000001 // Relative error allowed for sample values. ) -var testStartTime = time.Time{} +var testStartTime = time.Unix(0, 0) // Test is a sequence of read and write commands that are run // against a test storage. @@ -281,18 +281,13 @@ func (cmd *loadCmd) set(m labels.Labels, vals ...sequenceValue) { // append the defined time series to the storage. func (cmd *loadCmd) append(a storage.Appender) { - // TODO(fabxc): commented out until Appender refactoring. - // for fp, samples := range cmd.defs { - // met := cmd.metrics[fp] - // for _, smpl := range samples { - // s := &model.Sample{ - // Metric: met, - // Value: smpl.Value, - // Timestamp: smpl.Timestamp, - // } - // a.Append(s) - // } - // } + for h, smpls := range cmd.defs { + m := cmd.metrics[h] + + for _, s := range smpls { + a.Add(m, s.T, s.V) + } + } } // evalCmd is a command that evaluates an expression for the given time (range) @@ -381,6 +376,7 @@ func (ev *evalCmd) compareResult(result Value) error { if !ev.instant { return fmt.Errorf("received instant result on range evaluation") } + seen := map[uint64]bool{} for pos, v := range val { fp := v.Metric.Hash() @@ -391,7 +387,7 @@ func (ev *evalCmd) compareResult(result Value) error { if ev.ordered && exp.pos != pos+1 { return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1) } - if !almostEqual(float64(exp.vals[0].value), float64(v.V)) { + if !almostEqual(exp.vals[0].value, v.V) { return fmt.Errorf("expected %v for %s but got %v", exp.vals[0].value, v.Metric, v.V) } @@ -485,7 +481,7 @@ func (t *Test) exec(tc testCommand) error { if cmd.fail { return nil } - return fmt.Errorf("error evaluating query: %s", res.Err) + return fmt.Errorf("error evaluating query %q: %s", cmd.expr, res.Err) } if res.Err == nil && cmd.fail { return fmt.Errorf("expected error evaluating query but got none") @@ -514,8 +510,7 @@ func (t *Test) clear() { } t.storage = testutil.NewStorage(t) - // TODO(fabxc): add back - // t.queryEngine = NewEngine(t.storage, nil) + t.queryEngine = NewEngine(t.storage, nil) t.context, t.cancelCtx = context.WithCancel(context.Background()) } diff --git a/promql/value.go b/promql/value.go new file mode 100644 index 0000000000..95eaaebd49 --- /dev/null +++ b/promql/value.go @@ -0,0 +1,169 @@ +package promql + +import ( + "fmt" + "strings" + + "github.com/prometheus/prometheus/pkg/labels" +) + +// Value is a generic interface for values resulting from a query evaluation. +type Value interface { + Type() ValueType + String() string +} + +func (Matrix) Type() ValueType { return ValueTypeMatrix } +func (Vector) Type() ValueType { return ValueTypeVector } +func (Scalar) Type() ValueType { return ValueTypeScalar } +func (String) Type() ValueType { return ValueTypeString } + +// ValueType describes a type of a value. +type ValueType string + +// The valid value types. +const ( + ValueTypeNone = "none" + ValueTypeVector = "vector" + ValueTypeScalar = "scalar" + ValueTypeMatrix = "matrix" + ValueTypeString = "string" +) + +// String represents a string value. +type String struct { + V string + T int64 +} + +func (s String) String() string { + return s.V +} + +// Scalar is a data point that's explicitly not associated with a metric. +type Scalar struct { + T int64 + V float64 +} + +func (s Scalar) String() string { + return fmt.Sprintf("scalar: %v @[%v]", s.V, s.T) +} + +// Series is a stream of data points belonging to a metric. +type Series struct { + Metric labels.Labels + Points []Point +} + +func (s Series) String() string { + vals := make([]string, len(s.Points)) + for i, v := range s.Points { + vals[i] = v.String() + } + return fmt.Sprintf("%s =>\n%s", s.Metric, strings.Join(vals, "\n")) +} + +// Point represents a single data point for a given timestamp. +type Point struct { + T int64 + V float64 +} + +func (p Point) String() string { + return fmt.Sprintf("%f @[%d]", p.V, p.T) +} + +// Sample is a single sample belonging to a metric. +type Sample struct { + Point + + Metric labels.Labels +} + +func (s Sample) String() string { + return fmt.Sprintf("%s => %s", s.Metric, s.Point) +} + +// Vector is basically only an alias for model.Samples, but the +// contract is that in a Vector, all Samples have the same timestamp. +type Vector []Sample + +func (vec Vector) String() string { + entries := make([]string, len(vec)) + for i, s := range vec { + entries[i] = s.String() + } + return strings.Join(entries, "\n") +} + +// Matrix is a slice of Seriess that implements sort.Interface and +// has a String method. +type Matrix []Series + +func (m Matrix) String() string { + // TODO(fabxc): sort, or can we rely on order from the querier? + strs := make([]string, len(m)) + + for i, ss := range m { + strs[i] = ss.String() + } + + return strings.Join(strs, "\n") +} + +// Result holds the resulting value of an execution or an error +// if any occurred. +type Result struct { + Err error + Value Value +} + +// Vector returns a Vector if the result value is one. An error is returned if +// the result was an error or the result value is not a Vector. +func (r *Result) Vector() (Vector, error) { + if r.Err != nil { + return nil, r.Err + } + v, ok := r.Value.(Vector) + if !ok { + return nil, fmt.Errorf("query result is not a Vector") + } + return v, nil +} + +// Matrix returns a Matrix. An error is returned if +// the result was an error or the result value is not a Matrix. +func (r *Result) Matrix() (Matrix, error) { + if r.Err != nil { + return nil, r.Err + } + v, ok := r.Value.(Matrix) + if !ok { + return nil, fmt.Errorf("query result is not a range Vector") + } + return v, nil +} + +// Scalar returns a Scalar value. An error is returned if +// the result was an error or the result value is not a Scalar. +func (r *Result) Scalar() (Scalar, error) { + if r.Err != nil { + return Scalar{}, r.Err + } + v, ok := r.Value.(Scalar) + if !ok { + return Scalar{}, fmt.Errorf("query result is not a Scalar") + } + return v, nil +} + +func (r *Result) String() string { + if r.Err != nil { + return r.Err.Error() + } + if r.Value == nil { + return "" + } + return r.Value.String() +} diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index d2aa4f8c1a..1e5416e442 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -24,6 +24,7 @@ func Open(path string) (storage.Storage, error) { } func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) { + // fmt.Println("new querier at", timestamp.Time(mint), timestamp.Time(maxt), maxt-mint) return querier{q: a.db.Querier(mint, maxt)}, nil } @@ -48,9 +49,7 @@ func (q querier) Select(oms ...*labels.Matcher) storage.SeriesSet { ms = append(ms, convertMatcher(om)) } - set := q.q.Select(ms...) - - return seriesSet{set: set} + return seriesSet{set: q.q.Select(ms...)} } func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) } @@ -75,8 +74,11 @@ type appender struct { a tsdb.Appender } -func (a appender) Add(lset labels.Labels, t int64, v float64) { a.a.Add(toTSDBLabels(lset), t, v) } -func (a appender) Commit() error { return a.a.Commit() } +func (a appender) Add(lset labels.Labels, t int64, v float64) { + // fmt.Println("add", lset, timestamp.Time(t), v) + a.a.Add(toTSDBLabels(lset), t, v) +} +func (a appender) Commit() error { return a.a.Commit() } func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { switch m.Type { diff --git a/util/testutil/storage.go b/util/testutil/storage.go index b24e6f5f9b..34181ca28b 100644 --- a/util/testutil/storage.go +++ b/util/testutil/storage.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/tsdb" ) @@ -15,6 +16,9 @@ func NewStorage(t T) storage.Storage { if err != nil { t.Fatalf("Opening test dir failed: %s", err) } + + log.With("dir", dir).Debugln("opening test storage") + db, err := tsdb.Open(dir) if err != nil { t.Fatalf("Opening test storage failed: %s", err) @@ -28,6 +32,8 @@ type testStorage struct { } func (s testStorage) Close() error { + log.With("dir", s.dir).Debugln("closing test storage") + if err := s.Storage.Close(); err != nil { return err }