diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 1e201bbcf2..a0640c04de 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -877,7 +877,6 @@ func importRules(url *url.URL, start, end, outputDir string, evalInterval time.D } ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client)) - errs := ruleImporter.loadGroups(ctx, files) for _, err := range errs { if err != nil { diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 6d823794bd..5394c54b62 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -57,7 +57,6 @@ type ruleImporterConfig struct { // newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series // written to disk in blocks. func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter { - return &ruleImporter{ logger: logger, config: config, @@ -84,7 +83,7 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano()) for i, r := range group.Rules() { level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) - if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group.Interval()); err != nil { + if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group); err != nil { errs = append(errs, err) } } @@ -93,18 +92,23 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { } // importRule queries a prometheus API to evaluate rules at times in the past. -func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, interval time.Duration) (err error) { +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, grp *rules.Group) (err error) { blockDuration := tsdb.DefaultBlockDuration - startOfBlock := blockDuration * (start.Unix() / blockDuration) - for t := startOfBlock; t <= end.Unix(); t = t + blockDuration { - endOfBlock := t + blockDuration + startInMs := start.Unix() * int64(time.Second/time.Millisecond) + startOfBlock := blockDuration * (startInMs / blockDuration) + endInMs := end.Unix() * int64(time.Second/time.Millisecond) + for s := startOfBlock; s <= endInMs; s = s + blockDuration { + endOfBlock := s + blockDuration - 1 + + currStart := max(s/int64(time.Second/time.Millisecond), start.Unix()) + startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UnixNano()) val, warnings, err := importer.apiClient.QueryRange(ctx, ruleExpr, v1.Range{ - Start: time.Unix(t, 0), - End: time.Unix(endOfBlock, 0), - Step: interval, + Start: startWithAlignment, + End: time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0), + Step: grp.Interval(), }, ) if err != nil { @@ -174,8 +178,7 @@ func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *mu } // multipleAppender keeps track of how many series have been added to the current appender. -// If the max samples have been added, then all series are flushed to disk and commited and a new -// appender is created. +// If the max samples have been added, then all series are commited and a new appender is created. type multipleAppender struct { maxSamplesInMemory int currentSampleCount int @@ -215,3 +218,17 @@ func (m *multipleAppender) flushAndCommit(ctx context.Context) error { } return nil } + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index 8953841fb2..a4c16102ca 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -46,6 +46,12 @@ func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r return mockAPI.samples, v1.Warnings{}, nil } +func getTestProdData() []*model.SampleStream { + var result = []*model.SampleStream{} + + return result +} + // TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together. func TestBackfillRuleIntegration(t *testing.T) { var testCases = []struct { @@ -105,7 +111,7 @@ func TestBackfillRuleIntegration(t *testing.T) { require.Equal(t, "grp2_rule1_expr", g2Rules[0].Query().String()) require.Equal(t, 0, len(g2Rules[0].Labels())) - // Backfill all recording rules then check the blocks to confirm the right data was created. + // Backfill all recording rules then check the blocks to confirm the correct data was created. errs = ruleImporter.importAll(ctx) for _, err := range errs { require.NoError(t, err) @@ -159,22 +165,15 @@ func TestBackfillRuleIntegration(t *testing.T) { func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix) (*ruleImporter, error) { logger := log.NewNopLogger() cfg := ruleImporterConfig{ - Start: start.Add(-1 * time.Hour), - End: start, - EvalInterval: 60 * time.Second, - } - writer, err := tsdb.NewBlockWriter(logger, - tmpDir, - tsdb.DefaultBlockDuration, - ) - if err != nil { - return nil, err + outputDir: tmpDir, + start: start.Add(-1 * time.Hour), + end: start, + evalInterval: 60 * time.Second, } - app := newMultipleAppender(ctx, testMaxSampleCount, writer) return newRuleImporter(logger, cfg, mockQueryRangeAPI{ samples: testSamples, - }, app), nil + }), nil } func createSingleRuleTestFiles(path string) error { @@ -182,7 +181,7 @@ func createSingleRuleTestFiles(path string) error { - name: group0 rules: - record: rule1 - expr: ruleExpr + expr: ruleExpr labels: testlabel11: testlabelvalue11 `