diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index edddaf8b0f..0c78d93e7f 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -41,7 +41,6 @@ import ( "gopkg.in/alecthomas/kingpin.v2" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/importers" "github.com/prometheus/prometheus/pkg/rulefmt" ) @@ -132,16 +131,16 @@ func main() { backfillCmd := app.Command("backfill", "Backfill Prometheus data.") backfillRuleCmd := backfillCmd.Command("rules", "Backfill Prometheus data for new rules.") - backfillRuleURL := backfillRuleCmd.Flag("url", "Prometheus API url.").Required().String() - backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval", "How frequently to evaluate rules when backfilling."). - Default("-3h").Duration() backfillRuleStart := backfillRuleCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp."). - Required().Duration() - backfillRuleEnd := backfillRuleCmd.Flag("end", "If an end time is provided, the new rule backfilling will end at this time. The default will backfill to the 3 hrs ago. End time should be RFC3339 or Unix timestamp."). - Default("").Duration() + Required().String() + backfillRuleEnd := backfillRuleCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hrs ago. End time should be RFC3339 or Unix timestamp."). + Default("-3h").String() + backfillRuleURL := backfillRuleCmd.Flag("url", "Prometheus API url with the data where the rule will be backfilled from.").Default("localhost:9090").String() + backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval", "How frequently to evaluate rules when backfilling."). + Default("15s").Duration() backfillRuleFiles := backfillRuleCmd.Arg( "rule-files", - "The file containing the new rule that needs to be backfilled.", + "A list of one or more files containing recording rules to be backfilled. All recording rules listed in the files will be backfilled. Alerting rules are not evaluated.", ).Required().ExistingFiles() parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:])) @@ -767,24 +766,29 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { json.NewEncoder(os.Stdout).Encode(v) } -// BackfillRule backfills rules from the files provided -func BackfillRule(url string, start, end, evalInterval time.Duration, files ...string) int { +// BackfillRule backfills rules from the files provided. +func BackfillRule(url, start, end string, evalInterval time.Duration, files ...string) int { ctx := context.Background() - cfg := importers.RuleConfig{ - Start: start.String(), - End: end.String(), + stime, etime, err := parseStartTimeAndEndTime(start, end) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return 1 + } + cfg := RuleImporterConfig{ + Start: stime, + End: etime, EvalInterval: evalInterval, URL: url, } logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - ruleImporter := importers.NewRuleImporter(logger, cfg) - err := ruleImporter.Init() + ruleImporter := NewRuleImporter(logger, cfg) + err = ruleImporter.Init() if err != nil { fmt.Fprintln(os.Stderr, "rule importer init error", err) return 1 } - errs := ruleImporter.Parse(ctx, files) + errs := ruleImporter.LoadGroups(ctx, files) for _, err := range errs { if err != nil { fmt.Fprintln(os.Stderr, "rule importer parse error", err) diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go new file mode 100644 index 0000000000..52bf226880 --- /dev/null +++ b/cmd/promtool/rules.go @@ -0,0 +1,216 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/tsdb/importer/blocks" +) + +const blockSize = 2 // in hours + +// RuleImporter is the importer to backfill rules. +type RuleImporter struct { + logger log.Logger + config RuleImporterConfig + + groups map[string]*rules.Group + groupLoader rules.GroupLoader + + apiClient v1.API + + writer *blocks.MultiWriter +} + +// RuleImporterConfig is the config for the rule importer. +type RuleImporterConfig struct { + Start time.Time + End time.Time + EvalInterval time.Duration + URL string +} + +// NewRuleImporter creates a new rule importer that can be used to backfill rules. +func NewRuleImporter(logger log.Logger, config RuleImporterConfig) *RuleImporter { + return &RuleImporter{ + config: config, + groupLoader: rules.FileLoader{}, + } +} + +// Init initializes the rule importer which creates a new block writer +// and creates an Prometheus API client. +func (importer *RuleImporter) Init() error { + // todo: clean up dir + newBlockDir, err := ioutil.TempDir("", "rule_blocks") + if err != nil { + return err + } + importer.writer = blocks.NewMultiWriter(importer.logger, newBlockDir, importer.config.EvalInterval.Nanoseconds()) + + config := api.Config{ + Address: importer.config.URL, + } + c, err := api.NewClient(config) + if err != nil { + return err + } + importer.apiClient = v1.NewAPI(c) + return nil +} + +// Close cleans up any open resources. +func (importer *RuleImporter) Close() error { + // todo: clean up any dirs that were created + return importer.writer.Close() +} + +// LoadGroups reads groups from a list of rule files. +func (importer *RuleImporter) LoadGroups(ctx context.Context, filenames []string) (errs []error) { + groups := make(map[string]*rules.Group) + + for _, filename := range filenames { + rgs, errs := importer.groupLoader.Load(filename) + if errs != nil { + return errs + } + + for _, ruleGroup := range rgs.Groups { + + itv := importer.config.EvalInterval + if ruleGroup.Interval != 0 { + itv = time.Duration(ruleGroup.Interval) + } + + rgRules := make([]rules.Rule, 0, len(ruleGroup.Rules)) + + for _, r := range ruleGroup.Rules { + + expr, err := importer.groupLoader.Parse(r.Expr.Value) + if err != nil { + return []error{errors.Wrap(err, filename)} + } + + rgRules = append(rgRules, rules.NewRecordingRule( + r.Record.Value, + expr, + labels.FromMap(r.Labels), + )) + } + + groups[rules.GroupKey(filename, ruleGroup.Name)] = rules.NewGroup(rules.GroupOptions{ + Name: ruleGroup.Name, + File: filename, + Interval: itv, + Rules: rgRules, + }) + } + } + + importer.groups = groups + return nil +} + +// ImportAll evaluates all the groups and rules and creates new time series +// and stores them in new blocks. +func (importer *RuleImporter) ImportAll(ctx context.Context) []error { + var errs = []error{} + for _, group := range importer.groups { + stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) + + for _, r := range group.Rules() { + err := importer.ImportRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval()) + if err != nil { + errs = append(errs, err) + } + } + } + _, err := importer.writer.Flush() + if err != nil { + errs = append(errs, err) + } + return errs +} + +// ImportRule imports the historical data for a single rule. +func (importer *RuleImporter) ImportRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error { + ts := stimeWithAlignment + + appender := importer.writer.Appender() + + for ts.Before(importer.config.End) { + currentBlockEnd := ts.Add(blockSize * time.Hour) + if currentBlockEnd.After(importer.config.End) { + currentBlockEnd = importer.config.End + } + + val, warnings, err := importer.apiClient.QueryRange(ctx, + ruleExpr, + v1.Range{ + Start: ts, + End: currentBlockEnd, + Step: importer.config.EvalInterval, + }, + ) + if err != nil { + return err + } + if warnings != nil { + fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings) + } + + var matrix model.Matrix + switch val.Type() { + case model.ValMatrix: + matrix = val.(model.Matrix) + for _, sample := range matrix { + currentLabels := make(labels.Labels, 0, len(sample.Metric)) + for k, v := range sample.Metric { + currentLabels = append(currentLabels, labels.Label{ + Name: string(k), + Value: string(v), + }) + } + for _, value := range sample.Values { + _, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value)) + if err != nil { + // todo: handle other errors, i.e. ErrOutOfOrderSample and ErrDuplicateSampleForTimestamp + return err + } + } + } + default: + return errors.New("rule result is wrong type") + } + + ts = currentBlockEnd + } + _, err := importer.writer.Flush() + if err != nil { + return err + } + return appender.Commit() +} diff --git a/importers/rules.go b/importers/rules.go deleted file mode 100644 index a103eadb16..0000000000 --- a/importers/rules.go +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importers - -import ( - "context" - "fmt" - "io/ioutil" - "math" - "net/url" - "os" - "sort" - "strconv" - "time" - - "github.com/go-kit/kit/log" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/api" - v1 "github.com/prometheus/client_golang/api/prometheus/v1" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" - plabels "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/rules" - "github.com/prometheus/prometheus/tsdb/importer/blocks" -) - -// RuleImporter is the importer for rules -type RuleImporter struct { - logger log.Logger - - config RuleConfig - groups map[string]*rules.Group - - apiClient v1.API - - writer *blocks.MultiWriter -} - -// RuleConfig is the config for the rule importer -type RuleConfig struct { - Start string - End string - EvalInterval time.Duration - URL string -} - -// NewRuleImporter creates a new rule importer -func NewRuleImporter(logger log.Logger, config RuleConfig) *RuleImporter { - return &RuleImporter{ - config: config, - } -} - -// Init initializes the rule importer which creates a new block writer -// and creates an Prometheus API client -func (importer *RuleImporter) Init() error { - // create new block writer - newBlockDir, err := ioutil.TempDir("", "rule_blocks") - if err != nil { - return err - } - importer.writer = blocks.NewMultiWriter(importer.logger, newBlockDir, importer.config.EvalInterval.Nanoseconds()) - - // create api client - config := api.Config{ - Address: importer.config.URL, - } - c, err := api.NewClient(config) - if err != nil { - return err - } - importer.apiClient = v1.NewAPI(c) - return nil -} - -// Close cleans up any open resources -func (importer *RuleImporter) Close() error { - return importer.writer.Close() -} - -// Parse parses the groups and rules from a list of rules files -func (importer *RuleImporter) Parse(ctx context.Context, files []string) (errs []error) { - groups := make(map[string]*rules.Group) - - for _, file := range files { - ruleGroups, errs := rulefmt.ParseFile(file) - if errs != nil { - return errs - } - - for _, ruleGroup := range ruleGroups.Groups { - itv := importer.config.EvalInterval - if ruleGroup.Interval != 0 { - itv = time.Duration(ruleGroup.Interval) - } - - rulez := make([]rules.Rule, 0, len(ruleGroup.Rules)) - for _, r := range ruleGroup.Rules { - expr, err := parser.ParseExpr(r.Expr.Value) - if err != nil { - return []error{errors.Wrap(err, file)} - } - - rulez = append(rulez, rules.NewRecordingRule( - r.Record.Value, - expr, - labels.FromMap(r.Labels), - )) - } - - groups[file+";"+ruleGroup.Name] = rules.NewGroup(rules.GroupOptions{ - Name: ruleGroup.Name, - File: file, - Interval: itv, - Rules: rulez, - }) - } - } - - importer.groups = groups - return errs -} - -// ImportAll evaluates all the groups and rules and creates new time series -// and stores in new blocks -func (importer *RuleImporter) ImportAll(ctx context.Context) []error { - var errs = []error{} - for _, group := range importer.groups { - for _, rule := range group.Rules() { - err := importer.ImportRule(ctx, rule) - if err != nil { - errs = append(errs, err) - } - } - } - err := importer.CreateBlocks() - if err != nil { - errs = append(errs, err) - } - return errs -} - -func (importer *RuleImporter) queryFn(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - val, warnings, err := importer.apiClient.Query(ctx, q, t) - if err != nil { - return promql.Vector{}, err - } - if warnings != nil { - fmt.Fprint(os.Stderr, "warning api.Query:", warnings) - } - - switch val.Type() { - case model.ValVector: - valVector := val.(model.Vector) - return modelToPromqlVector(valVector), nil - case model.ValScalar: - valScalar := val.(*model.Scalar) - return promql.Vector{promql.Sample{ - Metric: labels.Labels{}, - Point: promql.Point{T: int64(valScalar.Timestamp), V: float64(valScalar.Value)}, - }}, nil - default: - return nil, errors.New("rule result is wrong type") - } -} - -func modelToPromqlVector(modelValue model.Vector) promql.Vector { - result := make(promql.Vector, 0, len(modelValue)) - - for _, value := range modelValue { - labels := make(labels.Labels, 0, len(value.Metric)) - - for k, v := range value.Metric { - labels = append(labels, plabels.Label{ - Name: string(k), - Value: string(v), - }) - } - sort.Sort(labels) - - result = append(result, promql.Sample{ - Metric: labels, - Point: promql.Point{T: int64(value.Timestamp), V: float64(value.Value)}, - }) - } - return result -} - -// ImportRule imports the historical data for a single rule -func (importer *RuleImporter) ImportRule(ctx context.Context, rule rules.Rule) error { - ts, err := parseTime(importer.config.Start) - if err != nil { - return err - } - end, err := parseTime(importer.config.End) - if err != nil { - return err - } - url, err := url.Parse(importer.config.URL) - if err != nil { - return err - } - - appender := importer.writer.Appender() - for ts.Before(end) { - vector, err := rule.Eval(ctx, ts, importer.queryFn, url) - if err != nil { - return err - } - for _, sample := range vector { - // we don't AddFast here because we need to maintain the - // ref for each series bcs rule.Eval could return different labels, - // so that means you would need to map the ref to metric, but that is what Add does - // anyways so just use that - _, err := appender.Add(plabels.Labels{plabels.Label{Name: sample.String()}}, sample.T, sample.V) - if err != nil { - return err - } - } - - ts.Add(importer.config.EvalInterval) - // todo: 2 hr blocks? - } - return appender.Commit() -} - -func parseTime(s string) (time.Time, error) { - if t, err := strconv.ParseFloat(s, 64); err == nil { - s, ns := math.Modf(t) - return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil - } - if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return t, nil - } - return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s) -} - -// CreateBlocks creates blocks for all the rule data -func (importer *RuleImporter) CreateBlocks() error { - _, err := importer.writer.Flush() - return err -} diff --git a/rules/manager.go b/rules/manager.go index c4344fa68e..ea7d104555 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -200,6 +200,8 @@ type Rule interface { Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string + // Query returns the rule query expression. + Query() parser.Expr // SetLastErr sets the current error experienced by the rule. SetLastError(error) // LastErr returns the last error experienced by the rule. @@ -262,7 +264,7 @@ func NewGroup(o GroupOptions) *Group { metrics = NewGroupMetrics(o.Opts.Registerer) } - key := groupKey(o.File, o.Name) + key := GroupKey(o.File, o.Name) metrics.evalTotal.WithLabelValues(key) metrics.evalFailures.WithLabelValues(key) metrics.groupLastEvalTime.WithLabelValues(key) @@ -302,7 +304,7 @@ func (g *Group) run(ctx context.Context) { defer close(g.terminated) // Wait an initial amount to have consistently slotted intervals. - evalTimestamp := g.evalTimestamp().Add(g.interval) + evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval) select { case <-time.After(time.Until(evalTimestamp)): case <-g.done: @@ -455,7 +457,7 @@ func (g *Group) GetEvaluationDuration() time.Duration { // setEvaluationDuration sets the time in seconds the last evaluation took. func (g *Group) setEvaluationDuration(dur time.Duration) { - g.metrics.groupLastDuration.WithLabelValues(groupKey(g.file, g.name)).Set(dur.Seconds()) + g.metrics.groupLastDuration.WithLabelValues(GroupKey(g.file, g.name)).Set(dur.Seconds()) g.mtx.Lock() defer g.mtx.Unlock() @@ -471,19 +473,19 @@ func (g *Group) GetEvaluationTimestamp() time.Time { // setEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule group was last evaluated. func (g *Group) setEvaluationTimestamp(ts time.Time) { - g.metrics.groupLastEvalTime.WithLabelValues(groupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) + g.metrics.groupLastEvalTime.WithLabelValues(GroupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) g.mtx.Lock() defer g.mtx.Unlock() g.evaluationTimestamp = ts } -// evalTimestamp returns the immediately preceding consistently slotted evaluation time. -func (g *Group) evalTimestamp() time.Time { +// EvalTimestamp returns the immediately preceding consistently slotted evaluation time. +func (g *Group) EvalTimestamp(startTime int64) time.Time { var ( offset = int64(g.hash() % uint64(g.interval)) - now = time.Now().UnixNano() - adjNow = now - offset + start = startTime + adjNow = start - offset base = adjNow - (adjNow % int64(g.interval)) ) @@ -567,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { rule.SetEvaluationTimestamp(t) }(time.Now()) - g.metrics.evalTotal.WithLabelValues(groupKey(g.File(), g.Name())).Inc() + g.metrics.evalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL) if err != nil { @@ -576,7 +578,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := err.(promql.ErrQueryCanceled); !ok { level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) } - g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc() + g.metrics.evalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() return } @@ -929,7 +931,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels // check if new group equals with the old group, if yes then skip it. // If not equals, stop it and wait for it to finish the current iteration. // Then copy it into the new group. - gn := groupKey(newg.file, newg.name) + gn := GroupKey(newg.file, newg.name) oldg, ok := m.groups[gn] delete(m.groups, gn) @@ -1042,7 +1044,7 @@ func (m *Manager) LoadGroups( )) } - groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{ + groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, File: fn, Interval: itv, @@ -1057,8 +1059,8 @@ func (m *Manager) LoadGroups( return groups, nil } -// Group names need not be unique across filenames. -func groupKey(file, name string) string { +// GroupKey group names need not be unique across filenames. +func GroupKey(file, name string) string { return file + ";" + name }