add --max-block-duration in promtool create-blocks-from rules (#9511)

* support maxBlockDuration for promtool tsdb create-blocks-from rules

Fixes #9465

Signed-off-by: Will Tran <will@autonomic.ai>

* don't hardcode 2h as the default block size in rules test

Signed-off-by: Will Tran <will@autonomic.ai>
This commit is contained in:
Will Tran 2021-10-21 17:28:37 -04:00 committed by GitHub
parent d81bbe154d
commit 97b0738895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 30 deletions

View File

@ -66,7 +66,7 @@ func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) {
return maxt, mint, nil return maxt, mint, nil
} }
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool) (returnErr error) { func getCompatibleBlockDuration(maxBlockDuration int64) int64 {
blockDuration := tsdb.DefaultBlockDuration blockDuration := tsdb.DefaultBlockDuration
if maxBlockDuration > tsdb.DefaultBlockDuration { if maxBlockDuration > tsdb.DefaultBlockDuration {
ranges := tsdb.ExponentialBlockRanges(tsdb.DefaultBlockDuration, 10, 3) ranges := tsdb.ExponentialBlockRanges(tsdb.DefaultBlockDuration, 10, 3)
@ -79,6 +79,11 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
} }
blockDuration = ranges[idx] blockDuration = ranges[idx]
} }
return blockDuration
}
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool) (returnErr error) {
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration) mint = blockDuration * (mint / blockDuration)
db, err := tsdb.OpenDBReadOnly(outputDir, nil) db, err := tsdb.OpenDBReadOnly(outputDir, nil)

View File

@ -250,7 +250,7 @@ func main() {
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration)) os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
case importRulesCmd.FullCommand(): case importRulesCmd.FullCommand():
os.Exit(checkErr(importRules(*importRulesURL, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *importRulesFiles...))) os.Exit(checkErr(importRules(*importRulesURL, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, *importRulesFiles...)))
} }
} }
@ -927,7 +927,7 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) {
// importRules backfills recording rules from the files provided. The output are blocks of data // importRules backfills recording rules from the files provided. The output are blocks of data
// at the outputDir location. // at the outputDir location.
func importRules(url *url.URL, start, end, outputDir string, evalInterval time.Duration, files ...string) error { func importRules(url *url.URL, start, end, outputDir string, evalInterval time.Duration, maxBlockDuration time.Duration, files ...string) error {
ctx := context.Background() ctx := context.Background()
var stime, etime time.Time var stime, etime time.Time
var err error var err error
@ -950,10 +950,11 @@ func importRules(url *url.URL, start, end, outputDir string, evalInterval time.D
} }
cfg := ruleImporterConfig{ cfg := ruleImporterConfig{
outputDir: outputDir, outputDir: outputDir,
start: stime, start: stime,
end: etime, end: etime,
evalInterval: evalInterval, evalInterval: evalInterval,
maxBlockDuration: maxBlockDuration,
} }
client, err := api.NewClient(api.Config{ client, err := api.NewClient(api.Config{
Address: url.String(), Address: url.String(),

View File

@ -48,10 +48,11 @@ type ruleImporter struct {
} }
type ruleImporterConfig struct { type ruleImporterConfig struct {
outputDir string outputDir string
start time.Time start time.Time
end time.Time end time.Time
evalInterval time.Duration evalInterval time.Duration
maxBlockDuration time.Duration
} }
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series // newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
@ -83,7 +84,7 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
for i, r := range group.Rules() { for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name()) level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name())
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, group); err != nil { if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, int64(importer.config.maxBlockDuration/time.Millisecond), group); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
@ -92,8 +93,9 @@ func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
} }
// importRule queries a prometheus API to evaluate rules at times in the past. // importRule queries a prometheus API to evaluate rules at times in the past.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, grp *rules.Group) (err error) { func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time,
blockDuration := tsdb.DefaultBlockDuration maxBlockDuration int64, grp *rules.Group) (err error) {
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
startInMs := start.Unix() * int64(time.Second/time.Millisecond) startInMs := start.Unix() * int64(time.Second/time.Millisecond)
endInMs := end.Unix() * int64(time.Second/time.Millisecond) endInMs := end.Unix() * int64(time.Second/time.Millisecond)
@ -130,7 +132,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName
// also need to append samples throughout the whole block range. To allow that, we // also need to append samples throughout the whole block range. To allow that, we
// pretend that the block is twice as large here, but only really add sample in the // pretend that the block is twice as large here, but only really add sample in the
// original interval later. // original interval later.
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*tsdb.DefaultBlockDuration) w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*blockDuration)
if err != nil { if err != nil {
return errors.Wrap(err, "new block writer") return errors.Wrap(err, "new block writer")
} }

View File

@ -38,6 +38,8 @@ func (mockAPI mockQueryRangeAPI) QueryRange(ctx context.Context, query string, r
return mockAPI.samples, v1.Warnings{}, nil return mockAPI.samples, v1.Warnings{}, nil
} }
const defaultBlockDuration = time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond
// TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together. // TestBackfillRuleIntegration is an integration test that runs all the rule importer code to confirm the parts work together.
func TestBackfillRuleIntegration(t *testing.T) { func TestBackfillRuleIntegration(t *testing.T) {
const ( const (
@ -46,23 +48,26 @@ func TestBackfillRuleIntegration(t *testing.T) {
testValue2 = 98 testValue2 = 98
) )
var ( var (
start = time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC) start = time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC)
testTime = model.Time(start.Add(-9 * time.Hour).Unix()) testTime = model.Time(start.Add(-9 * time.Hour).Unix())
testTime2 = model.Time(start.Add(-8 * time.Hour).Unix()) testTime2 = model.Time(start.Add(-8 * time.Hour).Unix())
twentyFourHourDuration, _ = time.ParseDuration("24h")
) )
var testCases = []struct { var testCases = []struct {
name string name string
runcount int runcount int
maxBlockDuration time.Duration
expectedBlockCount int expectedBlockCount int
expectedSeriesCount int expectedSeriesCount int
expectedSampleCount int expectedSampleCount int
samples []*model.SampleStream samples []*model.SampleStream
}{ }{
{"no samples", 1, 0, 0, 0, []*model.SampleStream{}}, {"no samples", 1, defaultBlockDuration, 0, 0, 0, []*model.SampleStream{}},
{"run importer once", 1, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}}, {"run importer once", 1, defaultBlockDuration, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}},
{"run importer with dup name label", 1, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"__name__": "val1", "name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}}, {"run importer with dup name label", 1, defaultBlockDuration, 8, 4, 4, []*model.SampleStream{{Metric: model.Metric{"__name__": "val1", "name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}},
{"one importer twice", 2, 8, 4, 8, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}, {Timestamp: testTime2, Value: testValue2}}}}}, {"one importer twice", 2, defaultBlockDuration, 8, 4, 8, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}, {Timestamp: testTime2, Value: testValue2}}}}},
{"run importer once with larger blocks", 1, twentyFourHourDuration, 4, 4, 4, []*model.SampleStream{{Metric: model.Metric{"name1": "val1"}, Values: []model.SamplePair{{Timestamp: testTime, Value: testValue}}}}},
} }
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
@ -76,7 +81,8 @@ func TestBackfillRuleIntegration(t *testing.T) {
// Execute the test more than once to simulate running the rule importer twice with the same data. // Execute the test more than once to simulate running the rule importer twice with the same data.
// We expect duplicate blocks with the same series are created when run more than once. // We expect duplicate blocks with the same series are created when run more than once.
for i := 0; i < tt.runcount; i++ { for i := 0; i < tt.runcount; i++ {
ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, tt.samples)
ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, tt.samples, tt.maxBlockDuration)
require.NoError(t, err) require.NoError(t, err)
path1 := filepath.Join(tmpDir, "test.file") path1 := filepath.Join(tmpDir, "test.file")
require.NoError(t, createSingleRuleTestFiles(path1)) require.NoError(t, createSingleRuleTestFiles(path1))
@ -162,13 +168,14 @@ func TestBackfillRuleIntegration(t *testing.T) {
} }
} }
func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix) (*ruleImporter, error) { func newTestRuleImporter(ctx context.Context, start time.Time, tmpDir string, testSamples model.Matrix, maxBlockDuration time.Duration) (*ruleImporter, error) {
logger := log.NewNopLogger() logger := log.NewNopLogger()
cfg := ruleImporterConfig{ cfg := ruleImporterConfig{
outputDir: tmpDir, outputDir: tmpDir,
start: start.Add(-10 * time.Hour), start: start.Add(-10 * time.Hour),
end: start.Add(-7 * time.Hour), end: start.Add(-7 * time.Hour),
evalInterval: 60 * time.Second, evalInterval: 60 * time.Second,
maxBlockDuration: maxBlockDuration,
} }
return newRuleImporter(logger, cfg, mockQueryRangeAPI{ return newRuleImporter(logger, cfg, mockQueryRangeAPI{
@ -225,8 +232,7 @@ func TestBackfillLabels(t *testing.T) {
Values: []model.SamplePair{{Timestamp: model.TimeFromUnixNano(start.UnixNano()), Value: 123}}, Values: []model.SamplePair{{Timestamp: model.TimeFromUnixNano(start.UnixNano()), Value: 123}},
}, },
} }
ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, mockAPISamples) ruleImporter, err := newTestRuleImporter(ctx, start, tmpDir, mockAPISamples, defaultBlockDuration)
require.NoError(t, err)
path := filepath.Join(tmpDir, "test.file") path := filepath.Join(tmpDir, "test.file")
recordingRules := `groups: recordingRules := `groups: