mirror of
https://github.com/prometheus/prometheus.git
synced 2025-09-21 05:41:01 +02:00
Optimise concurrent rule evaluation for rules querying ALERTS and ALERTS_FOR_STATE (#17064)
* Optimise concurrent rule evaluation for rules querying ALERTS and ALERTS_FOR_STATE Signed-off-by: Marco Pracucci <marco@pracucci.com> * Further optimised the case of ALERTS and ALERTS_FOR_STATE without alertname label matcher Signed-off-by: Marco Pracucci <marco@pracucci.com> --------- Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
8b3f59e9c3
commit
954cad35b2
@ -7,8 +7,14 @@ groups:
|
|||||||
- record: job:http_requests:rate5m
|
- record: job:http_requests:rate5m
|
||||||
expr: sum by (job)(rate(http_requests_total[5m]))
|
expr: sum by (job)(rate(http_requests_total[5m]))
|
||||||
|
|
||||||
# dependents
|
# dependents because of metric is matching
|
||||||
- record: job:http_requests:rate15m
|
- record: job:http_requests:rate15m
|
||||||
expr: sum by (job)(rate(http_requests_total[15m]))
|
expr: sum by (job)(rate(http_requests_total[15m]))
|
||||||
- record: TooManyRequests
|
- record: TooManyRequests
|
||||||
expr: job:http_requests:rate15m > 100
|
expr: job:http_requests:rate15m > 100
|
||||||
|
|
||||||
|
# dependents because of ALERTS is matching
|
||||||
|
- alert: TooManyFailures
|
||||||
|
expr: sum by (job)(rate(http_requests_failed_total[1m])) > 100
|
||||||
|
- alert: TooManyFailuresAlerts
|
||||||
|
expr: count(ALERTS{alertname="TooManyFailures"}) > 5
|
||||||
|
@ -13,3 +13,7 @@ groups:
|
|||||||
expr: sum by (job)(rate(http_requests_total[1h]))
|
expr: sum by (job)(rate(http_requests_total[1h]))
|
||||||
- record: job:http_requests:rate2h
|
- record: job:http_requests:rate2h
|
||||||
expr: sum by (job)(rate(http_requests_total[2h]))
|
expr: sum by (job)(rate(http_requests_total[2h]))
|
||||||
|
- record: job:http_requests_unless_single_alert:rate2h
|
||||||
|
expr: sum by (job)(rate(http_requests_total[2h])) unless (count by (job) (ALERTS{alertname="alert_1"}) > 0)
|
||||||
|
- record: job:http_requests_unless_multiple_alerts:rate2h
|
||||||
|
expr: sum by (job)(rate(http_requests_total[2h])) unless (count by (job) (ALERTS{alertname=~"alert_1|alert_2"}) > 0)
|
||||||
|
@ -1092,13 +1092,12 @@ func (m dependencyMap) isIndependent(r Rule) bool {
|
|||||||
|
|
||||||
// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
|
// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
|
||||||
//
|
//
|
||||||
// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose
|
// Both Alert and RecordingRule can have dependents and dependencies. Alert can have dependents if another rule,
|
||||||
// output an Alert rule depends will not be able to run concurrently.
|
// with in the group, queries ALERTS or ALERTS_FOR_NAME metrics.
|
||||||
//
|
//
|
||||||
// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be
|
// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be
|
||||||
// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour:
|
// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour:
|
||||||
// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector
|
// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector
|
||||||
// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE
|
|
||||||
//
|
//
|
||||||
// Rules which are independent can run concurrently with no side-effects.
|
// Rules which are independent can run concurrently with no side-effects.
|
||||||
func buildDependencyMap(rules []Rule) dependencyMap {
|
func buildDependencyMap(rules []Rule) dependencyMap {
|
||||||
@ -1138,22 +1137,43 @@ func buildDependencyMap(rules []Rule) dependencyMap {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
|
// Check if the vector selector is querying "meta-metrics" like ALERTS and ALERTS_FOR_STATE and, if so,
|
||||||
// if they run concurrently.
|
// find out the "alertname" label matcher (it could be missing).
|
||||||
if nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) {
|
nameMatchesAlerts := nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName)
|
||||||
indeterminate = true
|
var alertsNameMatcher *labels.Matcher
|
||||||
return nil
|
if nameMatchesAlerts {
|
||||||
|
for _, m := range n.LabelMatchers {
|
||||||
|
if m.Name == labels.AlertName {
|
||||||
|
alertsNameMatcher = m
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find rules which depend on the output of this rule.
|
// Find the other rules that this rule depends on.
|
||||||
for _, other := range rules {
|
for _, other := range rules {
|
||||||
|
// Rules are defined in order in a rule group. Once we find our rule we can stop searching
|
||||||
|
// because next rules can't be considered dependencies of this rule by specification, given
|
||||||
|
// they are defined later in the group. The next rules can still query this rule, but they're
|
||||||
|
// just not strict dependencies to honor.
|
||||||
if other == rule {
|
if other == rule {
|
||||||
continue
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
otherName := other.Name()
|
otherName := other.Name()
|
||||||
|
|
||||||
|
// If this rule vector selector matches the other rule name, then it's a dependency.
|
||||||
if nameMatcher.Matches(otherName) {
|
if nameMatcher.Matches(otherName) {
|
||||||
dependencies[other] = append(dependencies[other], rule)
|
dependencies[other] = append(dependencies[other], rule)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this rule vector selector is querying the alerts meta-metrics and the other rule
|
||||||
|
// is an alerting rule, then we check if the "alertname" matches. If it does, then it's a dependency.
|
||||||
|
if _, otherIsAlertingRule := other.(*AlertingRule); nameMatchesAlerts && otherIsAlertingRule {
|
||||||
|
if alertsNameMatcher == nil || alertsNameMatcher.Matches(otherName) {
|
||||||
|
dependencies[other] = append(dependencies[other], rule)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1771,49 +1771,119 @@ func TestDependenciesEdgeCases(t *testing.T) {
|
|||||||
require.False(t, depMap.isIndependent(rule2))
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("rule querying ALERTS metric", func(t *testing.T) {
|
for _, metaMetric := range []string{alertMetricName, alertForStateMetricName} {
|
||||||
expr, err := parser.ParseExpr("sum(requests)")
|
t.Run(metaMetric, func(t *testing.T) {
|
||||||
require.NoError(t, err)
|
t.Run("rule querying alerts meta-metric with alertname", func(t *testing.T) {
|
||||||
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
expr, err := parser.ParseExpr("sum(requests) > 0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewAlertingRule("first", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
expr, err = parser.ParseExpr(`sum(ALERTS{alertname="test"})`)
|
expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s{alertname="test"}) > 0`, metaMetric))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
rule2 := NewAlertingRule("second", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
group := NewGroup(GroupOptions{
|
expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s{alertname=~"first.*"}) > 0`, metaMetric))
|
||||||
Name: "rule_group",
|
require.NoError(t, err)
|
||||||
Interval: time.Second,
|
rule3 := NewAlertingRule("third", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
Rules: []Rule{rule1, rule2},
|
|
||||||
Opts: opts,
|
expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s{alertname!="first"}) > 0`, metaMetric))
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule4 := NewAlertingRule("fourth", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("sum(failures)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule5 := NewRecordingRule("fifth", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(fmt.Sprintf(`fifth > 0 and sum(%s{alertname="fourth"}) > 0`, metaMetric))
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule6 := NewAlertingRule("sixth", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2, rule3, rule4, rule5, rule6},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule1))
|
||||||
|
require.Empty(t, depMap.dependencies(rule1))
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule1), []Rule{rule3})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
require.Empty(t, depMap.dependencies(rule2))
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule2), []Rule{rule4})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule3))
|
||||||
|
require.ElementsMatch(t, depMap.dependencies(rule3), []Rule{rule1})
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule3), []Rule{rule4})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule4))
|
||||||
|
require.ElementsMatch(t, depMap.dependencies(rule4), []Rule{rule2, rule3})
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule4), []Rule{rule6})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule5))
|
||||||
|
require.Empty(t, depMap.dependencies(rule5))
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule5), []Rule{rule6})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule6))
|
||||||
|
require.ElementsMatch(t, depMap.dependencies(rule6), []Rule{rule4, rule5})
|
||||||
|
require.Empty(t, depMap.dependents(rule6))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("rule querying alerts meta-metric without alertname", func(t *testing.T) {
|
||||||
|
expr, err := parser.ParseExpr("sum(requests)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(`sum(requests) > 0`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule2 := NewAlertingRule("second", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(fmt.Sprintf(`sum(%s) > 0`, metaMetric))
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule3 := NewAlertingRule("third", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr("sum(failures)")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule4 := NewRecordingRule("fourth", expr, labels.Labels{})
|
||||||
|
|
||||||
|
expr, err = parser.ParseExpr(fmt.Sprintf(`fourth > 0 and sum(%s) > 0`, metaMetric))
|
||||||
|
require.NoError(t, err)
|
||||||
|
rule5 := NewAlertingRule("fifth", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, promslog.NewNopLogger())
|
||||||
|
|
||||||
|
group := NewGroup(GroupOptions{
|
||||||
|
Name: "rule_group",
|
||||||
|
Interval: time.Second,
|
||||||
|
Rules: []Rule{rule1, rule2, rule3, rule4, rule5},
|
||||||
|
Opts: opts,
|
||||||
|
})
|
||||||
|
|
||||||
|
depMap := buildDependencyMap(group.rules)
|
||||||
|
|
||||||
|
require.True(t, depMap.isIndependent(rule1))
|
||||||
|
require.Empty(t, depMap.dependencies(rule1))
|
||||||
|
require.Empty(t, depMap.dependents(rule1))
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule2))
|
||||||
|
require.Empty(t, depMap.dependencies(rule2))
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule2), []Rule{rule3, rule5})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule3))
|
||||||
|
require.ElementsMatch(t, depMap.dependencies(rule3), []Rule{rule2})
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule3), []Rule{rule5})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule4))
|
||||||
|
require.Empty(t, depMap.dependencies(rule4))
|
||||||
|
require.ElementsMatch(t, depMap.dependents(rule4), []Rule{rule5})
|
||||||
|
|
||||||
|
require.False(t, depMap.isIndependent(rule5))
|
||||||
|
require.ElementsMatch(t, depMap.dependencies(rule5), []Rule{rule2, rule3, rule4})
|
||||||
|
require.Empty(t, depMap.dependents(rule5))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
}
|
||||||
depMap := buildDependencyMap(group.rules)
|
|
||||||
// A rule querying ALERTS metric causes the whole group to be indeterminate.
|
|
||||||
require.False(t, depMap.isIndependent(rule1))
|
|
||||||
require.False(t, depMap.isIndependent(rule2))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("rule querying ALERTS_FOR_STATE metric", func(t *testing.T) {
|
|
||||||
expr, err := parser.ParseExpr("sum(requests)")
|
|
||||||
require.NoError(t, err)
|
|
||||||
rule1 := NewRecordingRule("first", expr, labels.Labels{})
|
|
||||||
|
|
||||||
expr, err = parser.ParseExpr(`sum(ALERTS_FOR_STATE{alertname="test"})`)
|
|
||||||
require.NoError(t, err)
|
|
||||||
rule2 := NewRecordingRule("second", expr, labels.Labels{})
|
|
||||||
|
|
||||||
group := NewGroup(GroupOptions{
|
|
||||||
Name: "rule_group",
|
|
||||||
Interval: time.Second,
|
|
||||||
Rules: []Rule{rule1, rule2},
|
|
||||||
Opts: opts,
|
|
||||||
})
|
|
||||||
|
|
||||||
depMap := buildDependencyMap(group.rules)
|
|
||||||
// A rule querying ALERTS_FOR_STATE metric causes the whole group to be indeterminate.
|
|
||||||
require.False(t, depMap.isIndependent(rule1))
|
|
||||||
require.False(t, depMap.isIndependent(rule2))
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoMetricSelector(t *testing.T) {
|
func TestNoMetricSelector(t *testing.T) {
|
||||||
@ -1877,35 +1947,6 @@ func TestDependentRulesWithNonMetricExpression(t *testing.T) {
|
|||||||
require.True(t, depMap.isIndependent(rule3))
|
require.True(t, depMap.isIndependent(rule3))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRulesDependentOnMetaMetrics(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
opts := &ManagerOptions{
|
|
||||||
Context: ctx,
|
|
||||||
Logger: promslog.NewNopLogger(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by
|
|
||||||
// the rule engine, and is therefore not independent.
|
|
||||||
expr, err := parser.ParseExpr("count(ALERTS)")
|
|
||||||
require.NoError(t, err)
|
|
||||||
rule := NewRecordingRule("alert_count", expr, labels.Labels{})
|
|
||||||
|
|
||||||
// Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules).
|
|
||||||
expr, err = parser.ParseExpr("1")
|
|
||||||
require.NoError(t, err)
|
|
||||||
rule2 := NewRecordingRule("one", expr, labels.Labels{})
|
|
||||||
|
|
||||||
group := NewGroup(GroupOptions{
|
|
||||||
Name: "rule_group",
|
|
||||||
Interval: time.Second,
|
|
||||||
Rules: []Rule{rule, rule2},
|
|
||||||
Opts: opts,
|
|
||||||
})
|
|
||||||
|
|
||||||
depMap := buildDependencyMap(group.rules)
|
|
||||||
require.False(t, depMap.isIndependent(rule))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
|
func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
|
||||||
files := []string{"fixtures/rules.yaml"}
|
files := []string{"fixtures/rules.yaml"}
|
||||||
ruleManager := NewManager(&ManagerOptions{
|
ruleManager := NewManager(&ManagerOptions{
|
||||||
@ -1986,10 +2027,11 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
require.Empty(t, errs)
|
require.Empty(t, errs)
|
||||||
require.Len(t, groups, 1)
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
ruleCount := 4
|
expectedRuleCount := 6
|
||||||
|
expectedSampleCount := 4
|
||||||
|
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
require.Len(t, group.rules, ruleCount)
|
require.Len(t, group.rules, expectedRuleCount)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
DefaultEvalIterationFunc(ctx, group, start)
|
DefaultEvalIterationFunc(ctx, group, start)
|
||||||
@ -2001,9 +2043,9 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
// Never expect more than 1 inflight query at a time.
|
// Never expect more than 1 inflight query at a time.
|
||||||
require.EqualValues(t, 1, maxInflight.Load())
|
require.EqualValues(t, 1, maxInflight.Load())
|
||||||
// Each rule should take at least 1 second to execute sequentially.
|
// Each rule should take at least 1 second to execute sequentially.
|
||||||
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
|
||||||
// Each rule produces one vector.
|
// Each recording rule produces one vector.
|
||||||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
// Group duration is higher than the sum of rule durations (group overhead).
|
// Group duration is higher than the sum of rule durations (group overhead).
|
||||||
require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
|
require.GreaterOrEqual(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
|
||||||
}
|
}
|
||||||
@ -2019,7 +2061,8 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
ruleCount := 4
|
expectedRuleCount := 6
|
||||||
|
expectedSampleCount := 4
|
||||||
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||||
|
|
||||||
// Configure concurrency settings.
|
// Configure concurrency settings.
|
||||||
@ -2033,7 +2076,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
require.Len(t, groups, 1)
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
require.Len(t, group.rules, ruleCount)
|
require.Len(t, group.rules, expectedRuleCount)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
DefaultEvalIterationFunc(ctx, group, start)
|
DefaultEvalIterationFunc(ctx, group, start)
|
||||||
@ -2041,9 +2084,9 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
||||||
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
||||||
// Some rules should execute concurrently so should complete quicker.
|
// Some rules should execute concurrently so should complete quicker.
|
||||||
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
|
||||||
// Each rule produces one vector.
|
// Each recording rule produces one vector.
|
||||||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -2057,7 +2100,8 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
ruleCount := 6
|
expectedRuleCount := 8
|
||||||
|
expectedSampleCount := expectedRuleCount
|
||||||
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||||
|
|
||||||
// Configure concurrency settings.
|
// Configure concurrency settings.
|
||||||
@ -2071,7 +2115,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
require.Len(t, groups, 1)
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
require.Len(t, group.rules, ruleCount)
|
require.Len(t, group.rules, expectedRuleCount)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
DefaultEvalIterationFunc(ctx, group, start)
|
DefaultEvalIterationFunc(ctx, group, start)
|
||||||
@ -2079,15 +2123,15 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
// Expected evaluation order (isn't affected by concurrency settings)
|
// Expected evaluation order (isn't affected by concurrency settings)
|
||||||
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
|
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
|
||||||
require.Equal(t, []ConcurrentRules{
|
require.Equal(t, []ConcurrentRules{
|
||||||
{0, 1, 2, 3, 4, 5},
|
{0, 1, 2, 3, 4, 5, 6, 7},
|
||||||
}, order)
|
}, order)
|
||||||
|
|
||||||
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
// Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
|
||||||
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
|
||||||
// Some rules should execute concurrently so should complete quicker.
|
// Some rules should execute concurrently so should complete quicker.
|
||||||
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
|
||||||
// Each rule produces one vector.
|
// Each recording rule produces one vector.
|
||||||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -2101,12 +2145,13 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
ruleCount := 6
|
expectedRuleCount := 8
|
||||||
|
expectedSampleCount := expectedRuleCount
|
||||||
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
opts := optsFactory(storage, &maxInflight, &inflightQueries, 0)
|
||||||
|
|
||||||
// Configure concurrency settings.
|
// Configure concurrency settings.
|
||||||
opts.ConcurrentEvalsEnabled = true
|
opts.ConcurrentEvalsEnabled = true
|
||||||
opts.MaxConcurrentEvals = int64(ruleCount) * 2
|
opts.MaxConcurrentEvals = int64(expectedRuleCount) * 2
|
||||||
opts.RuleConcurrencyController = nil
|
opts.RuleConcurrencyController = nil
|
||||||
ruleManager := NewManager(opts)
|
ruleManager := NewManager(opts)
|
||||||
|
|
||||||
@ -2115,7 +2160,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
require.Len(t, groups, 1)
|
require.Len(t, groups, 1)
|
||||||
|
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
require.Len(t, group.rules, ruleCount)
|
require.Len(t, group.rules, expectedRuleCount)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
@ -2124,15 +2169,15 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
// Expected evaluation order
|
// Expected evaluation order
|
||||||
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
|
order := group.opts.RuleConcurrencyController.SplitGroupIntoBatches(ctx, group)
|
||||||
require.Equal(t, []ConcurrentRules{
|
require.Equal(t, []ConcurrentRules{
|
||||||
{0, 1, 2, 3, 4, 5},
|
{0, 1, 2, 3, 4, 5, 6, 7},
|
||||||
}, order)
|
}, order)
|
||||||
|
|
||||||
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
|
// Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
|
||||||
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
|
require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
|
||||||
// Some rules should execute concurrently so should complete quicker.
|
// Some rules should execute concurrently so should complete quicker.
|
||||||
require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
|
require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRuleCount) * artificialDelay).Seconds())
|
||||||
// Each rule produces one vector.
|
// Each recording rule produces one vector.
|
||||||
require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
require.EqualValues(t, expectedSampleCount, testutil.ToFloat64(group.metrics.GroupSamples))
|
||||||
// Group duration is less than the sum of rule durations
|
// Group duration is less than the sum of rule durations
|
||||||
require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
|
require.Less(t, group.GetEvaluationTime(), group.GetRuleEvaluationTimeSum())
|
||||||
}
|
}
|
||||||
@ -2532,6 +2577,9 @@ func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.I
|
|||||||
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
|
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
|
||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
|
NotifyFunc: func(_ context.Context, _ string, _ ...*Alert) {
|
||||||
|
// No-op in tests.
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2600,6 +2648,14 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) {
|
|||||||
noDependentRules: true,
|
noDependentRules: true,
|
||||||
noDependencyRules: true,
|
noDependencyRules: true,
|
||||||
},
|
},
|
||||||
|
"job:http_requests_unless_single_alert:rate2h": {
|
||||||
|
noDependentRules: true,
|
||||||
|
noDependencyRules: true,
|
||||||
|
},
|
||||||
|
"job:http_requests_unless_multiple_alerts:rate2h": {
|
||||||
|
noDependentRules: true,
|
||||||
|
noDependencyRules: true,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -2622,6 +2678,14 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) {
|
|||||||
noDependentRules: true,
|
noDependentRules: true,
|
||||||
noDependencyRules: false,
|
noDependencyRules: false,
|
||||||
},
|
},
|
||||||
|
"TooManyFailures": {
|
||||||
|
noDependentRules: false,
|
||||||
|
noDependencyRules: true,
|
||||||
|
},
|
||||||
|
"TooManyFailuresAlerts": {
|
||||||
|
noDependentRules: true,
|
||||||
|
noDependencyRules: false,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user