From fa1e90003b9e05c697b4fb425f276968d06ff5b8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 3 Feb 2015 08:04:27 +0100 Subject: [PATCH] Query timeout added. This is related to #454. Queries now timeout after a duration set by the -query.timeout flag. The TotalEvalTimer is now started/stopped inside any of the ast.Eval* functions. --- rules/ast/ast.go | 24 ++++++++++++++++++++++-- rules/ast/printer.go | 6 ++++++ rules/ast/query_analyzer.go | 30 ++++++++++++++++++++++++++++-- stats/timer.go | 5 +++++ web/api/query.go | 3 --- 5 files changed, 61 insertions(+), 7 deletions(-) diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 4940188394..a58356f6c6 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -29,7 +29,16 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") +var ( + stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") + queryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.") +) + +type queryTimeoutError struct { + timeoutAfter time.Duration +} + +func (e queryTimeoutError) Error() string { return fmt.Sprintf("query timeout after %v", e.timeoutAfter) } // ---------------------------------------------------------------------------- // Raw data value types. @@ -391,16 +400,24 @@ func labelsToKey(labels clientmodel.Metric) uint64 { // EvalVectorInstant evaluates a VectorNode with an instant query. func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { + totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start() + defer totalEvalTimer.Stop() + closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) if err != nil { return nil, err } defer closer.Close() + if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout { + return nil, queryTimeoutError{et} + } return node.Eval(timestamp), nil } // EvalVectorRange evaluates a VectorNode with a range query. func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) { + totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start() + defer totalEvalTimer.Stop() // Explicitly initialize to an empty matrix since a nil Matrix encodes to // null in JSON. matrix := Matrix{} @@ -413,10 +430,13 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod } defer closer.Close() - // TODO implement watchdog timer for long-running queries. evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start() sampleStreams := map[uint64]*SampleStream{} for t := start; !t.After(end); t = t.Add(interval) { + if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout { + evalTimer.Stop() + return nil, queryTimeoutError{et} + } vector := node.Eval(t) for _, sample := range vector { samplePair := metric.SamplePair{ diff --git a/rules/ast/printer.go b/rules/ast/printer.go index eba3ffe72f..7c9ebcade3 100644 --- a/rules/ast/printer.go +++ b/rules/ast/printer.go @@ -160,6 +160,9 @@ func TypedValueToJSON(data interface{}, typeStr string) string { // EvalToString evaluates the given node into a string of the given format. func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string { + totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start() + defer totalEvalTimer.Stop() + prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start() closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) prepareTimer.Stop() @@ -212,6 +215,9 @@ func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputForma // EvalToVector evaluates the given node into a Vector. Matrices aren't supported. func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { + totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start() + defer totalEvalTimer.Stop() + prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start() closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) prepareTimer.Stop() diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index 37f4e3da2e..f606773d44 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -110,22 +110,35 @@ func (i *iteratorInitializer) Visit(node Node) { } func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) { + totalTimer := queryStats.GetTimer(stats.TotalEvalTime) + analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start() analyzer := NewQueryAnalyzer(storage) Walk(analyzer, node) analyzeTimer.Stop() - // TODO: Preloading should time out after a given duration. preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start() p := storage.NewPreloader() for fp, rangeDuration := range analyzer.FullRanges { + if et := totalTimer.ElapsedTime(); et > *queryTimeout { + preloadTimer.Stop() + p.Close() + return nil, queryTimeoutError{et} + } if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp, *stalenessDelta); err != nil { + preloadTimer.Stop() p.Close() return nil, err } } for fp := range analyzer.IntervalRanges { + if et := totalTimer.ElapsedTime(); et > *queryTimeout { + preloadTimer.Stop() + p.Close() + return nil, queryTimeoutError{et} + } if err := p.PreloadRange(fp, timestamp, timestamp, *stalenessDelta); err != nil { + preloadTimer.Stop() p.Close() return nil, err } @@ -141,16 +154,23 @@ func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage loc } func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) { + totalTimer := queryStats.GetTimer(stats.TotalEvalTime) + analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start() analyzer := NewQueryAnalyzer(storage) Walk(analyzer, node) analyzeTimer.Stop() - // TODO: Preloading should time out after a given duration. preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start() p := storage.NewPreloader() for fp, rangeDuration := range analyzer.FullRanges { + if et := totalTimer.ElapsedTime(); et > *queryTimeout { + preloadTimer.Stop() + p.Close() + return nil, queryTimeoutError{et} + } if err := p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta); err != nil { + preloadTimer.Stop() p.Close() return nil, err } @@ -169,7 +189,13 @@ func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.T */ } for fp := range analyzer.IntervalRanges { + if et := totalTimer.ElapsedTime(); et > *queryTimeout { + preloadTimer.Stop() + p.Close() + return nil, queryTimeoutError{et} + } if err := p.PreloadRange(fp, start, end, *stalenessDelta); err != nil { + preloadTimer.Stop() p.Close() return nil, err } diff --git a/stats/timer.go b/stats/timer.go index 6ee915c567..461a108300 100644 --- a/stats/timer.go +++ b/stats/timer.go @@ -40,6 +40,11 @@ func (t *Timer) Stop() { t.duration += time.Since(t.start) } +// ElapsedTime returns the time that passed since starting the timer. +func (t *Timer) ElapsedTime() time.Duration { + return time.Since(t.start) +} + // Return a string representation of the Timer. func (t *Timer) String() string { return fmt.Sprintf("%s: %s", t.name, t.duration) diff --git a/web/api/query.go b/web/api/query.go index f0616cbffa..7f5bfb16ab 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -65,7 +65,6 @@ func (serv MetricsService) Query(w http.ResponseWriter, r *http.Request) { } timestamp := clientmodel.TimestampFromTime(serv.time.Now()) - queryStats := stats.NewTimerGroup() result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats) glog.V(1).Infof("Instant query: %s\nQuery stats:\n%s\n", expr, queryStats) @@ -123,7 +122,6 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) { queryStats := stats.NewTimerGroup() - evalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start() matrix, err := ast.EvalVectorRange( exprNode.(ast.VectorNode), clientmodel.TimestampFromUnixNano(end-duration), @@ -135,7 +133,6 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, ast.ErrorToJSON(err)) return } - evalTimer.Stop() sortTimer := queryStats.GetTimer(stats.ResultSortTime).Start() sort.Sort(matrix)