From 954cad35b2f8511a34b9d9172893cab7232df94a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 21 Aug 2025 16:57:57 +0200 Subject: [PATCH] Optimise concurrent rule evaluation for rules querying ALERTS and ALERTS_FOR_STATE (#17064) * Optimise concurrent rule evaluation for rules querying ALERTS and ALERTS_FOR_STATE Signed-off-by: Marco Pracucci * Further optimised the case of ALERTS and ALERTS_FOR_STATE without alertname label matcher Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- rules/fixtures/rules_multiple.yaml | 8 +- .../fixtures/rules_multiple_independent.yaml | 4 + rules/group.go | 40 ++- rules/manager_test.go | 248 +++++++++++------- 4 files changed, 197 insertions(+), 103 deletions(-) diff --git a/rules/fixtures/rules_multiple.yaml b/rules/fixtures/rules_multiple.yaml index db57bede1b..d8eb4394d4 100644 --- a/rules/fixtures/rules_multiple.yaml +++ b/rules/fixtures/rules_multiple.yaml @@ -7,8 +7,14 @@ groups: - record: job:http_requests:rate5m expr: sum by (job)(rate(http_requests_total[5m])) - # dependents + # dependents because of metric is matching - record: job:http_requests:rate15m expr: sum by (job)(rate(http_requests_total[15m])) - record: TooManyRequests expr: job:http_requests:rate15m > 100 + + # dependents because of ALERTS is matching + - alert: TooManyFailures + expr: sum by (job)(rate(http_requests_failed_total[1m])) > 100 + - alert: TooManyFailuresAlerts + expr: count(ALERTS{alertname="TooManyFailures"}) > 5 diff --git a/rules/fixtures/rules_multiple_independent.yaml b/rules/fixtures/rules_multiple_independent.yaml index e071be3eff..835fbe364e 100644 --- a/rules/fixtures/rules_multiple_independent.yaml +++ b/rules/fixtures/rules_multiple_independent.yaml @@ -13,3 +13,7 @@ groups: expr: sum by (job)(rate(http_requests_total[1h])) - record: job:http_requests:rate2h expr: sum by (job)(rate(http_requests_total[2h])) + - record: job:http_requests_unless_single_alert:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) unless (count by (job) (ALERTS{alertname="alert_1"}) > 0) + - record: job:http_requests_unless_multiple_alerts:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) unless (count by (job) (ALERTS{alertname=~"alert_1|alert_2"}) > 0) diff --git a/rules/group.go b/rules/group.go index ed727ff983..d2d4a69fc1 100644 --- a/rules/group.go +++ b/rules/group.go @@ -1092,13 +1092,12 @@ func (m dependencyMap) isIndependent(r Rule) bool { // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. // -// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose -// output an Alert rule depends will not be able to run concurrently. +// Both Alert and RecordingRule can have dependents and dependencies. Alert can have dependents if another rule, +// with in the group, queries ALERTS or ALERTS_FOR_NAME metrics. // // There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be // inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: // - wildcard queriers like {cluster="prod1"} which would match every series with that label selector -// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE // // Rules which are independent can run concurrently with no side-effects. func buildDependencyMap(rules []Rule) dependencyMap { @@ -1138,22 +1137,43 @@ func buildDependencyMap(rules []Rule) dependencyMap { return nil } - // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour - // if they run concurrently. - if nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) { - indeterminate = true - return nil + // Check if the vector selector is querying "meta-metrics" like ALERTS and ALERTS_FOR_STATE and, if so, + // find out the "alertname" label matcher (it could be missing). + nameMatchesAlerts := nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) + var alertsNameMatcher *labels.Matcher + if nameMatchesAlerts { + for _, m := range n.LabelMatchers { + if m.Name == labels.AlertName { + alertsNameMatcher = m + break + } + } } - // Find rules which depend on the output of this rule. + // Find the other rules that this rule depends on. for _, other := range rules { + // Rules are defined in order in a rule group. Once we find our rule we can stop searching + // because next rules can't be considered dependencies of this rule by specification, given + // they are defined later in the group. The next rules can still query this rule, but they're + // just not strict dependencies to honor. if other == rule { - continue + break } otherName := other.Name() + + // If this rule vector selector matches the other rule name, then it's a dependency. if nameMatcher.Matches(otherName) { dependencies[other] = append(dependencies[other], rule) + continue + } + + // If this rule vector selector is querying the alerts meta-metrics and the other rule + // is an alerting rule, then we check if the "alertname" matches. If it does, then it's a dependency. + if _, otherIsAlertingRule := other.(*AlertingRule); nameMatchesAlerts && otherIsAlertingRule { + if alertsNameMatcher == nil || alertsNameMatcher.Matches(otherName) { + dependencies[other] = append(dependencies[other], rule) + } } } } diff --git a/rules/manager_test.go b/rules/manager_test.go index 1d18d4dd28..66898245a3 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1771,49 +1771,119 @@ func TestDependenciesEdgeCases(t *testing.T) { require.False(t, depMap.isIndependent(rule2)) }) - t.Run("rule querying ALERTS metric", func(t *testing.T) { - expr, err := parser.ParseExpr("sum(requests)") - require.NoError(t, err) - rule1 := NewRecordingRule("first", expr, labels.Labels{}) + for _, metaMetric := range []string{alertMetricName, alertForStateMetricName} { + t.Run(metaMetric, func(t *testing.T) { + t.Run("rule querying alerts meta-metric with alertname", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests) > 0") + require.NoError(t, err) + rule1 := NewAlertingRule("first", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) - expr, err = parser.ParseExpr(`sum(ALERTS{alertname="test"})`) - require.NoError(t, err) - rule2 := NewRecordingRule("second", expr, labels.Labels{}) + expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s{alertname="test"}) > 0`, metaMetric)) + require.NoError(t, err) + rule2 := NewAlertingRule("second", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) - group := NewGroup(GroupOptions{ - Name: "rule_group", - Interval: time.Second, - Rules: []Rule{rule1, rule2}, - Opts: opts, + expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s{alertname=~"first.*"}) > 0`, metaMetric)) + require.NoError(t, err) + rule3 := NewAlertingRule("third", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) + + expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s{alertname!="first"}) > 0`, metaMetric)) + require.NoError(t, err) + rule4 := NewAlertingRule("fourth", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) + + expr, err = parser.ParseExpr("sum(failures)") + require.NoError(t, err) + rule5 := NewRecordingRule("fifth", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(fmt.Sprintf(`fifth > 0 and sum(%s{alertname="fourth"}) > 0`, metaMetric)) + require.NoError(t, err) + rule6 := NewAlertingRule("sixth", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2, rule3, rule4, rule5, rule6}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + + require.False(t, depMap.isIndependent(rule1)) + require.Empty(t, depMap.dependencies(rule1)) + require.ElementsMatch(t, depMap.dependents(rule1), []Rule{rule3}) + + require.False(t, depMap.isIndependent(rule2)) + require.Empty(t, depMap.dependencies(rule2)) + require.ElementsMatch(t, depMap.dependents(rule2), []Rule{rule4}) + + require.False(t, depMap.isIndependent(rule3)) + require.ElementsMatch(t, depMap.dependencies(rule3), []Rule{rule1}) + require.ElementsMatch(t, depMap.dependents(rule3), []Rule{rule4}) + + require.False(t, depMap.isIndependent(rule4)) + require.ElementsMatch(t, depMap.dependencies(rule4), []Rule{rule2, rule3}) + require.ElementsMatch(t, depMap.dependents(rule4), []Rule{rule6}) + + require.False(t, depMap.isIndependent(rule5)) + require.Empty(t, depMap.dependencies(rule5)) + require.ElementsMatch(t, depMap.dependents(rule5), []Rule{rule6}) + + require.False(t, depMap.isIndependent(rule6)) + require.ElementsMatch(t, depMap.dependencies(rule6), []Rule{rule4, rule5}) + require.Empty(t, depMap.dependents(rule6)) + }) + + t.Run("rule querying alerts meta-metric without alertname", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum(requests) > 0`) + require.NoError(t, err) + rule2 := NewAlertingRule("second", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) + + expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s) > 0`, metaMetric)) + require.NoError(t, err) + rule3 := NewAlertingRule("third", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) + + expr, err = parser.ParseExpr("sum(failures)") + require.NoError(t, err) + rule4 := NewRecordingRule("fourth", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(fmt.Sprintf(`fourth > 0 and sum(%s) > 0`, metaMetric)) + require.NoError(t, err) + rule5 := NewAlertingRule("fifth", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger()) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2, rule3, rule4, rule5}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + + require.True(t, depMap.isIndependent(rule1)) + require.Empty(t, depMap.dependencies(rule1)) + require.Empty(t, depMap.dependents(rule1)) + + require.False(t, depMap.isIndependent(rule2)) + require.Empty(t, depMap.dependencies(rule2)) + require.ElementsMatch(t, depMap.dependents(rule2), []Rule{rule3, rule5}) + + require.False(t, depMap.isIndependent(rule3)) + require.ElementsMatch(t, depMap.dependencies(rule3), []Rule{rule2}) + require.ElementsMatch(t, depMap.dependents(rule3), []Rule{rule5}) + + require.False(t, depMap.isIndependent(rule4)) + require.Empty(t, depMap.dependencies(rule4)) + require.ElementsMatch(t, depMap.dependents(rule4), []Rule{rule5}) + + require.False(t, depMap.isIndependent(rule5)) + require.ElementsMatch(t, depMap.dependencies(rule5), []Rule{rule2, rule3, rule4}) + require.Empty(t, depMap.dependents(rule5)) + }) }) - - depMap := buildDependencyMap(group.rules) - // A rule querying ALERTS metric causes the whole group to be indeterminate. - require.False(t, depMap.isIndependent(rule1)) - require.False(t, depMap.isIndependent(rule2)) - }) - - t.Run("rule querying ALERTS_FOR_STATE metric", func(t *testing.T) { - expr, err := parser.ParseExpr("sum(requests)") - require.NoError(t, err) - rule1 := NewRecordingRule("first", expr, labels.Labels{}) - - expr, err = parser.ParseExpr(`sum(ALERTS_FOR_STATE{alertname="test"})`) - require.NoError(t, err) - rule2 := NewRecordingRule("second", expr, labels.Labels{}) - - group := NewGroup(GroupOptions{ - Name: "rule_group", - Interval: time.Second, - Rules: []Rule{rule1, rule2}, - Opts: opts, - }) - - depMap := buildDependencyMap(group.rules) - // A rule querying ALERTS_FOR_STATE metric causes the whole group to be indeterminate. - require.False(t, depMap.isIndependent(rule1)) - require.False(t, depMap.isIndependent(rule2)) - }) + } } func TestNoMetricSelector(t *testing.T) { @@ -1877,35 +1947,6 @@ func TestDependentRulesWithNonMetricExpression(t *testing.T) { require.True(t, depMap.isIndependent(rule3)) } -func TestRulesDependentOnMetaMetrics(t *testing.T) { - ctx := context.Background() - opts := &ManagerOptions{ - Context: ctx, - Logger: promslog.NewNopLogger(), - } - - // This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by - // the rule engine, and is therefore not independent. - expr, err := parser.ParseExpr("count(ALERTS)") - require.NoError(t, err) - rule := NewRecordingRule("alert_count", expr, labels.Labels{}) - - // Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules). - expr, err = parser.ParseExpr("1") - require.NoError(t, err) - rule2 := NewRecordingRule("one", expr, labels.Labels{}) - - group := NewGroup(GroupOptions{ - Name: "rule_group", - Interval: time.Second, - Rules: []Rule{rule, rule2}, - Opts: opts, - }) - - depMap := buildDependencyMap(group.rules) - require.False(t, depMap.isIndependent(rule)) -} - func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { files := []string{"fixtures/rules.yaml"} ruleManager := NewManager(&ManagerOptions{ @@ -1986,10 +2027,11 @@ func TestAsyncRuleEvaluation(t *testing.T) { require.Empty(t, errs) require.Len(t, groups, 1) - ruleCount := 4 + expectedRuleCount := 6 + expectedSampleCount := 4 for _, group := range groups { - require.Len(t, group.rules, ruleCount) + require.Len(t, group.rules, expectedRuleCount) start := time.Now() DefaultEvalIterationFunc(ctx, group, start) @@ -2001,9 +2043,9 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Never expect more than 1 inflight query at a time. require.EqualValues(t, 1, maxInflight.Load()) // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) // Group duration is higher than the sum of rule durations (group overhead). require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) } @@ -2019,7 +2061,8 @@ func TestAsyncRuleEvaluation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleCount := 4 + expectedRuleCount := 6 + expectedSampleCount := 4 opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) // Configure concurrency settings. @@ -2033,7 +2076,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { require.Len(t, groups, 1) for _, group := range groups { - require.Len(t, group.rules, ruleCount) + require.Len(t, group.rules, expectedRuleCount) start := time.Now() DefaultEvalIterationFunc(ctx, group, start) @@ -2041,9 +2084,9 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } }) @@ -2057,7 +2100,8 @@ func TestAsyncRuleEvaluation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleCount := 6 + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) // Configure concurrency settings. @@ -2071,7 +2115,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { require.Len(t, groups, 1) for _, group := range groups { - require.Len(t, group.rules, ruleCount) + require.Len(t, group.rules, expectedRuleCount) start := time.Now() DefaultEvalIterationFunc(ctx, group, start) @@ -2079,15 +2123,15 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order (isn't affected by concurrency settings) order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5}, + {0, 1, 2, 3, 4, 5, 6, 7}, }, order) // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } }) @@ -2101,12 +2145,13 @@ func TestAsyncRuleEvaluation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleCount := 6 + expectedRuleCount := 8 + expectedSampleCount := expectedRuleCount opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true - opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2 opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) @@ -2115,7 +2160,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { require.Len(t, groups, 1) for _, group := range groups { - require.Len(t, group.rules, ruleCount) + require.Len(t, group.rules, expectedRuleCount) start := time.Now() @@ -2124,15 +2169,15 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Expected evaluation order order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group) require.Equal(t, []ConcurrentRules{ - {0, 1, 2, 3, 4, 5}, + {0, 1, 2, 3, 4, 5, 6, 7}, }, order) // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds()) + // Each recording rule produces one vector. + require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples)) // Group duration is less than the sum of rule durations require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum()) } @@ -2532,6 +2577,9 @@ func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.I promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, + NotifyFunc: func(_ context.Context, _ string, _ ...*Alert) { + // No-op in tests. + }, } } @@ -2600,6 +2648,14 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) { noDependentRules: true, noDependencyRules: true, }, + "job:http_requests_unless_single_alert:rate2h": { + noDependentRules: true, + noDependencyRules: true, + }, + "job:http_requests_unless_multiple_alerts:rate2h": { + noDependentRules: true, + noDependencyRules: true, + }, }, }, { @@ -2622,6 +2678,14 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) { noDependentRules: true, noDependencyRules: false, }, + "TooManyFailures": { + noDependentRules: false, + noDependencyRules: true, + }, + "TooManyFailuresAlerts": { + noDependentRules: true, + noDependencyRules: false, + }, }, }, {