diff --git a/rules/fixtures/rules_indeterminates.yaml b/rules/fixtures/rules_indeterminates.yaml new file mode 100644 index 0000000000..a906d3b504 --- /dev/null +++ b/rules/fixtures/rules_indeterminates.yaml @@ -0,0 +1,18 @@ +groups: + - name: indeterminate + rules: + # This shouldn't run in parallel because of the open matcher + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) + - record: matcher + expr: '{job="job1"}' diff --git a/rules/manager.go b/rules/manager.go index 6e9bf64691..df3d48700a 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -453,6 +453,11 @@ type ruleDependencyController struct{} // AnalyseRules implements RuleDependencyController. func (c ruleDependencyController) AnalyseRules(rules []Rule) { depMap := buildDependencyMap(rules) + + if depMap == nil { + return + } + for _, r := range rules { r.SetNoDependentRules(depMap.dependents(r) == 0) r.SetNoDependencyRules(depMap.dependencies(r) == 0) diff --git a/rules/manager_test.go b/rules/manager_test.go index 6afac993d8..327addcbc1 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -2110,6 +2110,45 @@ func TestAsyncRuleEvaluation(t *testing.T) { require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } }) + + t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { + t.Parallel() + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} + maxInflight := atomic.Int32{} + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 7 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_indeterminates.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) } func TestBoundedRuleEvalConcurrency(t *testing.T) { @@ -2222,3 +2261,132 @@ func TestLabels_FromMaps(t *testing.T) { require.Equal(t, expected, mLabels, "unexpected labelset") } + +func TestRuleDependencyController_AnalyseRules(t *testing.T) { + type expectedDependencies struct { + noDependentRules bool + noDependencyRules bool + } + + testCases := []struct { + name string + ruleFile string + expected map[string]expectedDependencies + }{ + { + name: "all independent rules", + ruleFile: "fixtures/rules_multiple_independent.yaml", + expected: map[string]expectedDependencies{ + "job:http_requests:rate1m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate5m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate15m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate30m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate1h": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate2h": { + noDependentRules: true, + noDependencyRules: true, + }, + }, + }, + { + name: "some dependent rules", + ruleFile: "fixtures/rules_multiple.yaml", + expected: map[string]expectedDependencies{ + "job:http_requests:rate1m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate5m": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests:rate15m": { + noDependentRules: false, + noDependencyRules: true, + }, + "TooManyRequests": { + noDependentRules: true, + noDependencyRules: false, + }, + }, + }, + { + name: "indeterminate rules", + ruleFile: "fixtures/rules_indeterminates.yaml", + expected: map[string]expectedDependencies{ + "job:http_requests:rate1m": { + noDependentRules: false, + noDependencyRules: false, + }, + "job:http_requests:rate5m": { + noDependentRules: false, + noDependencyRules: false, + }, + "job:http_requests:rate15m": { + noDependentRules: false, + noDependencyRules: false, + }, + "job:http_requests:rate30m": { + noDependentRules: false, + noDependencyRules: false, + }, + "job:http_requests:rate1h": { + noDependentRules: false, + noDependencyRules: false, + }, + "job:http_requests:rate2h": { + noDependentRules: false, + noDependencyRules: false, + }, + "matcher": { + noDependentRules: false, + noDependencyRules: false, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: promslog.NewNopLogger(), + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { return nil, nil }, + }) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, tc.ruleFile) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, g := range groups { + ruleManager.opts.RuleDependencyController.AnalyseRules(g.rules) + + for _, r := range g.rules { + exp, ok := tc.expected[r.Name()] + require.Truef(t, ok, "rule not found in expected: %s", r.String()) + require.Equalf(t, exp.noDependentRules, r.NoDependentRules(), "rule: %s", r.String()) + require.Equalf(t, exp.noDependencyRules, r.NoDependencyRules(), "rule: %s", r.String()) + } + } + }) + } +}