From c10780c966d9669c7edeb055d9c5528fdb438756 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 23 May 2013 21:29:27 +0200 Subject: [PATCH] Introduce telemetry for rule evaluator durations. This commit adds telemetry for the Prometheus expression rule evaluator, which will enable meta-Prometheus monitoring of customers to ensure that no instance is falling behind in answering routine queries. A few other sundry simplifications are introduced, too. --- rules/ast/ast.go | 5 ----- rules/manager.go | 24 ++++++++++++++++-------- rules/telemetry.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 13 deletions(-) create mode 100644 rules/telemetry.go diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 814f88d6d4..4d98b0ffbf 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -37,11 +37,6 @@ type groupedAggregation struct { groupCount int } -type labelValuePair struct { - label model.LabelName - value model.LabelValue -} - // ---------------------------------------------------------------------------- // Enums. diff --git a/rules/manager.go b/rules/manager.go index 82618c4b08..0954fce25d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -47,17 +47,21 @@ func NewRuleManager(results chan *Result, interval time.Duration, storage *metri interval: interval, storage: storage, } + // BUG(julius): Extract this so that the caller manages concurrency. go manager.run(results) return manager } func (m *ruleManager) run(results chan *Result) { - ticker := time.Tick(m.interval) + ticker := time.NewTicker(m.interval) + defer ticker.Stop() for { select { - case <-ticker: + case <-ticker.C: + start := time.Now() m.runIteration(results) + evalDurations.Add(map[string]string{intervalKey: m.interval.String()}, float64(time.Since(start)/time.Millisecond)) case <-m.done: log.Printf("RuleManager exiting...") break @@ -66,27 +70,31 @@ func (m *ruleManager) run(results chan *Result) { } func (m *ruleManager) Stop() { - m.done <- true + select { + case m.done <- true: + default: + } } func (m *ruleManager) runIteration(results chan *Result) { now := time.Now() wg := sync.WaitGroup{} + for _, rule := range m.rules { wg.Add(1) + // BUG(julius): Look at fixing thundering herd. go func(rule Rule) { + defer wg.Done() vector, err := rule.Eval(now, m.storage) - samples := model.Samples{} - for _, sample := range vector { - samples = append(samples, sample) - } + samples := make(model.Samples, len(vector)) + copy(samples, vector) m.results <- &Result{ Samples: samples, Err: err, } - wg.Done() }(rule) } + wg.Wait() } diff --git a/rules/telemetry.go b/rules/telemetry.go new file mode 100644 index 0000000000..fdb440c70f --- /dev/null +++ b/rules/telemetry.go @@ -0,0 +1,34 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rules + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + intervalKey = "interval" +) + +var ( + evalDurations = prometheus.NewHistogram(&prometheus.HistogramSpecification{ + Starts: prometheus.LogarithmicSizedBucketsFor(0, 10000), + BucketBuilder: prometheus.AccumulatingBucketBuilder(prometheus.EvictAndReplaceWith(10, prometheus.AverageReducer), 100), + ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}}) + evalDuration = prometheus.NewCounter() +) + +func init() { + prometheus.Register("prometheus_evaluator_duration_ms", "The duration for each evaluation pool to execute.", prometheus.NilLabels, evalDurations) +}