From 9adad8ad3045a93fff03400970f1aa77cc034cb0 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Tue, 28 Jan 2020 21:38:49 +0100 Subject: [PATCH] Remove MaxConcurrent from the PromQL engine opts (#6712) Since we use ActiveQueryTracker to check for concurrency in d992c36b3a08470532fb4fb36a768a20ec8f2e12 it does not make sense to keep the MaxConcurrent value as an option of the PromQL engine. This pull request removes it from the PromQL engine options, sets the max concurrent metric to -1 if there is no active query tracker, and use the value of the active query tracker otherwise. It removes dead code and also will inform people who import the promql package that we made that change, as it breaks the EngineOpts struct. Signed-off-by: Julien Pivotto --- cmd/prometheus/main.go | 1 - promql/bench_test.go | 9 +++-- promql/engine.go | 8 +++-- promql/engine_test.go | 77 ++++++++++++++++++---------------------- promql/functions_test.go | 9 +++-- promql/query_logger.go | 28 +++++++++------ promql/test.go | 18 +++++----- rules/alerting_test.go | 9 +++-- rules/manager_test.go | 36 +++++++++---------- rules/recording_test.go | 18 +++++----- web/api/v1/api_test.go | 9 +++-- 11 files changed, 105 insertions(+), 117 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 77e8916936..9ffe3a906f 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -357,7 +357,6 @@ func main() { opts = promql.EngineOpts{ Logger: log.With(logger, "component", "query engine"), Reg: prometheus.DefaultRegisterer, - MaxConcurrent: cfg.queryConcurrency, MaxSamples: cfg.queryMaxSamples, Timeout: time.Duration(cfg.queryTimeout), ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), diff --git a/promql/bench_test.go b/promql/bench_test.go index 56ef789b1c..58f12139e3 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -29,11 +29,10 @@ func BenchmarkRangeQuery(b *testing.B) { storage := teststorage.New(b) defer storage.Close() opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 50000000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 50000000, + Timeout: 100 * time.Second, } engine := NewEngine(opts) diff --git a/promql/engine.go b/promql/engine.go index 96d66f9778..0add9e1473 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -217,7 +217,6 @@ func contextErr(err error, env string) error { type EngineOpts struct { Logger log.Logger Reg prometheus.Registerer - MaxConcurrent int MaxSamples int Timeout time.Duration ActiveQueryTracker *ActiveQueryTracker @@ -299,7 +298,12 @@ func NewEngine(opts EngineOpts) *Engine { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), } - metrics.maxConcurrentQueries.Set(float64(opts.MaxConcurrent)) + + if t := opts.ActiveQueryTracker; t != nil { + metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent())) + } else { + metrics.maxConcurrentQueries.Set(-1) + } if opts.Reg != nil { opts.Reg.MustRegister( diff --git a/promql/engine_test.go b/promql/engine_test.go index 677a1468b6..a13cdfd77a 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -41,7 +41,6 @@ func TestQueryConcurrency(t *testing.T) { opts := EngineOpts{ Logger: nil, Reg: nil, - MaxConcurrent: maxConcurrency, MaxSamples: 10, Timeout: 100 * time.Second, ActiveQueryTracker: queryTracker, @@ -60,7 +59,7 @@ func TestQueryConcurrency(t *testing.T) { return nil } - for i := 0; i < opts.MaxConcurrent; i++ { + for i := 0; i < maxConcurrency; i++ { q := engine.newTestQuery(f) go q.Exec(ctx) select { @@ -92,18 +91,17 @@ func TestQueryConcurrency(t *testing.T) { } // Terminate remaining queries. - for i := 0; i < opts.MaxConcurrent; i++ { + for i := 0; i < maxConcurrency; i++ { block <- struct{}{} } } func TestQueryTimeout(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 20, - MaxSamples: 10, - Timeout: 5 * time.Millisecond, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 5 * time.Millisecond, } engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -127,11 +125,10 @@ const errQueryCanceled = ErrQueryCanceled("test statement execution") func TestQueryCancel(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -198,11 +195,10 @@ func (e errSeriesSet) Err() error { return e.err } func TestQueryError(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) errStorage := ErrStorage{errors.New("storage error")} @@ -261,11 +257,10 @@ func (*paramCheckerQuerier) Close() error { r func TestParamsSetCorrectly(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } // Set the lookback to be smaller and reset at the end. @@ -466,11 +461,10 @@ func TestParamsSetCorrectly(t *testing.T) { func TestEngineShutdown(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -1149,11 +1143,10 @@ func (f *FakeQueryLogger) Log(l ...interface{}) error { func TestQueryLogger_basic(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) @@ -1201,11 +1194,10 @@ func TestQueryLogger_basic(t *testing.T) { func TestQueryLogger_fields(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) @@ -1231,11 +1223,10 @@ func TestQueryLogger_fields(t *testing.T) { func TestQueryLogger_error(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) diff --git a/promql/functions_test.go b/promql/functions_test.go index bd0ced3f42..760d6e3821 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -31,11 +31,10 @@ func TestDeriv(t *testing.T) { storage := teststorage.New(t) defer storage.Close() opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10000, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 10 * time.Second, } engine := NewEngine(opts) diff --git a/promql/query_logger.go b/promql/query_logger.go index e63e5cde52..1ae1bec5f3 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -28,9 +28,10 @@ import ( ) type ActiveQueryTracker struct { - mmapedFile []byte - getNextIndex chan int - logger log.Logger + mmapedFile []byte + getNextIndex chan int + logger log.Logger + maxConcurrent int } type Entry struct { @@ -102,13 +103,13 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, er return fileAsBytes, err } -func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.Logger) *ActiveQueryTracker { +func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker { err := os.MkdirAll(localStoragePath, 0777) if err != nil { level.Error(logger).Log("msg", "Failed to create directory for logging active queries") } - filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxQueries*entrySize + filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize logUnfinishedQueries(filename, filesize, logger) fileAsBytes, err := getMMapedFile(filename, filesize, logger) @@ -118,12 +119,13 @@ func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.L copy(fileAsBytes, "[") activeQueryTracker := ActiveQueryTracker{ - mmapedFile: fileAsBytes, - getNextIndex: make(chan int, maxQueries), - logger: logger, + mmapedFile: fileAsBytes, + getNextIndex: make(chan int, maxConcurrent), + logger: logger, + maxConcurrent: maxConcurrent, } - activeQueryTracker.generateIndices(maxQueries) + activeQueryTracker.generateIndices(maxConcurrent) return &activeQueryTracker } @@ -164,12 +166,16 @@ func newJSONEntry(query string, logger log.Logger) []byte { return jsonEntry } -func (tracker ActiveQueryTracker) generateIndices(maxQueries int) { - for i := 0; i < maxQueries; i++ { +func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) { + for i := 0; i < maxConcurrent; i++ { tracker.getNextIndex <- 1 + (i * entrySize) } } +func (tracker ActiveQueryTracker) GetMaxConcurrent() int { + return tracker.maxConcurrent +} + func (tracker ActiveQueryTracker) Delete(insertIndex int) { copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize)) tracker.getNextIndex <- insertIndex diff --git a/promql/test.go b/promql/test.go index 7c8c688e55..8c2b0497fc 100644 --- a/promql/test.go +++ b/promql/test.go @@ -516,11 +516,10 @@ func (t *Test) clear() { t.storage = teststorage.New(t) opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 20, - MaxSamples: 10000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, } t.queryEngine = NewEngine(opts) @@ -630,11 +629,10 @@ func (ll *LazyLoader) clear() { ll.storage = teststorage.New(ll) opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 20, - MaxSamples: 10000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, } ll.queryEngine = NewEngine(opts) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 7bc37d5c13..59505fd6dc 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -298,11 +298,10 @@ func TestAlertingRuleDuplicate(t *testing.T) { defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) diff --git a/rules/manager_test.go b/rules/manager_test.go index 204c4f51ab..1f62009e07 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -502,11 +502,10 @@ func TestStaleness(t *testing.T) { storage := teststorage.New(t) defer storage.Close() engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) opts := &ManagerOptions{ @@ -689,11 +688,10 @@ func TestUpdate(t *testing.T) { storage := teststorage.New(t) defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ @@ -821,11 +819,10 @@ func TestNotify(t *testing.T) { storage := teststorage.New(t) defer storage.Close() engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) var lastNotified []*Alert @@ -889,11 +886,10 @@ func TestMetricsUpdate(t *testing.T) { registry := prometheus.NewRegistry() defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ diff --git a/rules/recording_test.go b/rules/recording_test.go index 6a333d2901..c00a244c03 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -31,11 +31,10 @@ func TestRuleEval(t *testing.T) { defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) @@ -99,11 +98,10 @@ func TestRuleEvalDuplicate(t *testing.T) { defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 063c6721b3..e8e679e388 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -228,11 +228,10 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { defer storage.Close() engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 100 * time.Second, } engine := promql.NewEngine(engineOpts)