From 1f3821379c23ab5855df88ad33f5bb5ecfd68a03 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 1 Jun 2023 17:17:04 +0000 Subject: [PATCH 1/4] promql: refactor: extract fn to wait on concurrency limit Signed-off-by: Bryan Boreham --- promql/engine.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 8a64fdf394..8730466f75 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -589,18 +589,11 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) defer execSpanTimer.Finish() - queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) - // Log query in active log. The active log guarantees that we don't run over - // MaxConcurrent queries. - if ng.activeQueryTracker != nil { - queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q) - if err != nil { - queueSpanTimer.Finish() - return nil, nil, contextErr(err, "query queue") - } - defer ng.activeQueryTracker.Delete(queryIndex) + if finish, err := ng.queueActive(ctx, q); err != nil { + return nil, nil, err + } else { + defer finish() } - queueSpanTimer.Finish() // Cancel when execution is done or an error was raised. defer q.cancel() @@ -623,6 +616,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) } +// Log query in active log. The active log guarantees that we don't run over +// MaxConcurrent queries. +func (ng *Engine) queueActive(ctx context.Context, q *query) (func(), error) { + if ng.activeQueryTracker == nil { + return func() {}, nil + } + queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) + queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q) + queueSpanTimer.Finish() + return func() { ng.activeQueryTracker.Delete(queryIndex) }, err +} + func timeMilliseconds(t time.Time) int64 { return t.UnixNano() / int64(time.Millisecond/time.Nanosecond) } From 71fc4f1516b8264606daca7bf7c0e011b7f91f2e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 1 Jun 2023 17:54:17 +0000 Subject: [PATCH 2/4] promql: refactor: create query object before parsing Signed-off-by: Bryan Boreham --- promql/engine.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 8730466f75..81fb3c9168 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -409,15 +409,15 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) { // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { + pExpr, qry := ng.newQuery(q, qs, opts, ts, ts, 0) expr, err := parser.ParseExpr(qs) if err != nil { return nil, err } - qry, err := ng.newQuery(q, opts, expr, ts, ts, 0) - if err != nil { + if err := ng.validateOpts(expr); err != nil { return nil, err } - qry.q = qs + *pExpr = PreprocessExpr(expr, ts, ts) return qry, nil } @@ -425,27 +425,23 @@ func (ng *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts * // NewRangeQuery returns an evaluation query for the given time range and with // the resolution set by the interval. func (ng *Engine) NewRangeQuery(_ context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { + pExpr, qry := ng.newQuery(q, qs, opts, start, end, interval) expr, err := parser.ParseExpr(qs) if err != nil { return nil, err } + if err := ng.validateOpts(expr); err != nil { + return nil, err + } if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } - qry, err := ng.newQuery(q, opts, expr, start, end, interval) - if err != nil { - return nil, err - } - qry.q = qs + *pExpr = PreprocessExpr(expr, start, end) return qry, nil } -func (ng *Engine) newQuery(q storage.Queryable, opts *QueryOpts, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) { - if err := ng.validateOpts(expr); err != nil { - return nil, err - } - +func (ng *Engine) newQuery(q storage.Queryable, qs string, opts *QueryOpts, start, end time.Time, interval time.Duration) (*parser.Expr, *query) { // Default to empty QueryOpts if not provided. if opts == nil { opts = &QueryOpts{} @@ -457,20 +453,20 @@ func (ng *Engine) newQuery(q storage.Queryable, opts *QueryOpts, expr parser.Exp } es := &parser.EvalStmt{ - Expr: PreprocessExpr(expr, start, end), Start: start, End: end, Interval: interval, LookbackDelta: lookbackDelta, } qry := &query{ + q: qs, stmt: es, ng: ng, stats: stats.NewQueryTimers(), sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats), queryable: q, } - return qry, nil + return &es.Expr, qry } var ( From bb0d8320ddc3b8f8d4888524a7f631f904a5ec6f Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 1 Jun 2023 18:16:05 +0000 Subject: [PATCH 3/4] promql: include parsing in active-query tracking So that the max-concurrency limit is applied. Signed-off-by: Bryan Boreham --- promql/engine.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 81fb3c9168..085a7d23c4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -408,8 +408,13 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) { } // NewInstantQuery returns an evaluation query for the given expression at the given time. -func (ng *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { +func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { pExpr, qry := ng.newQuery(q, qs, opts, ts, ts, 0) + if finish, err := ng.queueActive(ctx, qry); err != nil { + return nil, err + } else { + defer finish() + } expr, err := parser.ParseExpr(qs) if err != nil { return nil, err @@ -424,8 +429,13 @@ func (ng *Engine) NewInstantQuery(_ context.Context, q storage.Queryable, opts * // NewRangeQuery returns an evaluation query for the given time range and with // the resolution set by the interval. -func (ng *Engine) NewRangeQuery(_ context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { +func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { pExpr, qry := ng.newQuery(q, qs, opts, start, end, interval) + if finish, err := ng.queueActive(ctx, qry); err != nil { + return nil, err + } else { + defer finish() + } expr, err := parser.ParseExpr(qs) if err != nil { return nil, err From 67d2ef004d84427626fadb6e62b420b80712a7f9 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 1 Jun 2023 18:36:34 +0000 Subject: [PATCH 4/4] Placate lint I think the version using scoping was better, but I'm out of energy to fight the linter. Signed-off-by: Bryan Boreham --- promql/engine.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 085a7d23c4..f29db3a647 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -410,11 +410,11 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) { // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { pExpr, qry := ng.newQuery(q, qs, opts, ts, ts, 0) - if finish, err := ng.queueActive(ctx, qry); err != nil { + finishQueue, err := ng.queueActive(ctx, qry) + if err != nil { return nil, err - } else { - defer finish() } + defer finishQueue() expr, err := parser.ParseExpr(qs) if err != nil { return nil, err @@ -431,11 +431,11 @@ func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts // the resolution set by the interval. func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { pExpr, qry := ng.newQuery(q, qs, opts, start, end, interval) - if finish, err := ng.queueActive(ctx, qry); err != nil { + finishQueue, err := ng.queueActive(ctx, qry) + if err != nil { return nil, err - } else { - defer finish() } + defer finishQueue() expr, err := parser.ParseExpr(qs) if err != nil { return nil, err @@ -595,11 +595,11 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws storag execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) defer execSpanTimer.Finish() - if finish, err := ng.queueActive(ctx, q); err != nil { + finishQueue, err := ng.queueActive(ctx, q) + if err != nil { return nil, nil, err - } else { - defer finish() } + defer finishQueue() // Cancel when execution is done or an error was raised. defer q.cancel()