diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f7244646e2..e36665857b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -137,6 +137,7 @@ type flagConfig struct { forGracePeriod model.Duration outageTolerance model.Duration resendDelay model.Duration + maxConcurrentEvals int64 web web.Options scrape scrape.Options tsdb tsdbOptions @@ -157,6 +158,7 @@ type flagConfig struct { enablePerStepStats bool enableAutoGOMAXPROCS bool enableAutoGOMEMLIMIT bool + enableConcurrentRuleEval bool prometheusURL string corsRegexString string @@ -203,6 +205,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "auto-gomemlimit": c.enableAutoGOMEMLIMIT = true level.Info(logger).Log("msg", "Automatically set GOMEMLIMIT to match Linux container or system memory limit") + case "concurrent-rule-eval": + c.enableConcurrentRuleEval = true + level.Info(logger).Log("msg", "Experimental concurrent rule evaluation enabled.") case "no-default-scrape-port": c.scrape.NoDefaultPort = true level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") @@ -411,6 +416,9 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) + serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently."). + Default("4").Int64Var(&cfg.maxConcurrentEvals) + a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps) @@ -749,17 +757,19 @@ func main() { queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - Queryable: localStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - OutageTolerance: time.Duration(cfg.outageTolerance), - ForGracePeriod: time.Duration(cfg.forGracePeriod), - ResendDelay: time.Duration(cfg.resendDelay), + Appendable: fanoutStorage, + Queryable: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), + ResendDelay: time.Duration(cfg.resendDelay), + MaxConcurrentEvals: cfg.maxConcurrentEvals, + ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, }) } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 747457de1d..2faea5b15e 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -48,6 +48,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | +| --rules.max-concurrent-evals | Global concurrency limit for independent rules that can run concurrently. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index adefaad4b0..c3540cc234 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -212,3 +212,15 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details). Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped. + +## Concurrent evaluation of independent rules + +`--enable-feature=concurrent-rule-eval` + +By default, rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the +output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no +reason to run them sequentially. +When the `concurrent-rule-eval` feature flag is enabled, rules without any dependency on other rules within a rule group will be evaluated concurrently. +This has the potential to improve rule group evaluation latency and resource utilization at the expense of adding more concurrent query load. + +The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals`, which is set to `4` by default. diff --git a/rules/fixtures/rules_dependencies.yaml b/rules/fixtures/rules_dependencies.yaml new file mode 100644 index 0000000000..31d2c61763 --- /dev/null +++ b/rules/fixtures/rules_dependencies.yaml @@ -0,0 +1,7 @@ +groups: + - name: test + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: HighRequestRate + expr: job:http_requests:rate5m > 100 diff --git a/rules/fixtures/rules_multiple.yaml b/rules/fixtures/rules_multiple.yaml new file mode 100644 index 0000000000..db57bede1b --- /dev/null +++ b/rules/fixtures/rules_multiple.yaml @@ -0,0 +1,14 @@ +groups: + - name: test + rules: + # independents + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + # dependents + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: TooManyRequests + expr: job:http_requests:rate15m > 100 diff --git a/rules/fixtures/rules_multiple_groups.yaml b/rules/fixtures/rules_multiple_groups.yaml new file mode 100644 index 0000000000..87f31a6ca5 --- /dev/null +++ b/rules/fixtures/rules_multiple_groups.yaml @@ -0,0 +1,28 @@ +groups: + - name: http + rules: + # independents + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + # dependents + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: TooManyHTTPRequests + expr: job:http_requests:rate15m > 100 + + - name: grpc + rules: + # independents + - record: job:grpc_requests:rate1m + expr: sum by (job)(rate(grpc_requests_total[1m])) + - record: job:grpc_requests:rate5m + expr: sum by (job)(rate(grpc_requests_total[5m])) + + # dependents + - record: job:grpc_requests:rate15m + expr: sum by (job)(rate(grpc_requests_total[15m])) + - record: TooManyGRPCRequests + expr: job:grpc_requests:rate15m > 100 diff --git a/rules/fixtures/rules_multiple_independent.yaml b/rules/fixtures/rules_multiple_independent.yaml new file mode 100644 index 0000000000..e071be3eff --- /dev/null +++ b/rules/fixtures/rules_multiple_independent.yaml @@ -0,0 +1,15 @@ +groups: + - name: independents + rules: + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) diff --git a/rules/group.go b/rules/group.go index 55673452e5..5ee06dc0ba 100644 --- a/rules/group.go +++ b/rules/group.go @@ -21,8 +21,11 @@ import ( "sync" "time" + "go.uber.org/atomic" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/promql/parser" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -68,6 +71,9 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc + + // concurrencyController controls the rules evaluation concurrency. + concurrencyController RuleConcurrencyController } // GroupEvalIterationFunc is used to implement and extend rule group @@ -111,21 +117,27 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } + concurrencyController := o.Opts.RuleConcurrencyController + if concurrencyController == nil { + concurrencyController = sequentialRuleEvalController{} + } + return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - evalIterationFunc: evalIterationFunc, + name: o.Name, + file: o.File, + interval: o.Interval, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + evalIterationFunc: evalIterationFunc, + concurrencyController: concurrencyController, } } @@ -420,8 +432,13 @@ func (g *Group) CopyState(from *Group) { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. +// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal float64 + var ( + samplesTotal atomic.Float64 + wg sync.WaitGroup + ) + for i, rule := range g.rules { select { case <-g.done: @@ -429,7 +446,11 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { default: } - func(i int, rule Rule) { + eval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) @@ -465,7 +486,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } rule.SetHealth(HealthGood) rule.SetLastError(nil) - samplesTotal += float64(len(vector)) + samplesTotal.Add(float64(len(vector))) if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) @@ -554,11 +575,25 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - }(i, rule) - } - if g.metrics != nil { - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) + } + + // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. + // Try run concurrently if there are slots available. + if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() { + wg.Add(1) + + go eval(i, rule, func() { + wg.Done() + ctrl.Done() + }) + } else { + eval(i, rule, nil) + } } + + wg.Wait() + + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) g.cleanupStaleSeries(ctx, ts) } @@ -866,3 +901,110 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { return m } + +// dependencyMap is a data-structure which contains the relationships between rules within a group. +// It is used to describe the dependency associations between rules in a group whereby one rule uses the +// output metric produced by another rule in its expression (i.e. as its "input"). +type dependencyMap map[Rule][]Rule + +// dependents returns the count of rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) int { + return len(m[r]) +} + +// dependencies returns the count of rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) int { + if len(m) == 0 { + return 0 + } + + var count int + for _, children := range m { + for _, child := range children { + if child == r { + count++ + } + } + } + + return count +} + +// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule +// dependent on its output. +func (m dependencyMap) isIndependent(r Rule) bool { + if m == nil { + return false + } + + return m.dependents(r)+m.dependencies(r) == 0 +} + +// buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +// +// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose +// output an Alert rule depends will not be able to run concurrently. +// +// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be +// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: +// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector +// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE +// +// Rules which are independent can run concurrently with no side-effects. +func buildDependencyMap(rules []Rule) dependencyMap { + dependencies := make(dependencyMap) + + if len(rules) <= 1 { + // No relationships if group has 1 or fewer rules. + return dependencies + } + + inputs := make(map[string][]Rule, len(rules)) + outputs := make(map[string][]Rule, len(rules)) + + var indeterminate bool + + for _, rule := range rules { + if indeterminate { + break + } + + name := rule.Name() + outputs[name] = append(outputs[name], rule) + + parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error { + if n, ok := node.(*parser.VectorSelector); ok { + // A wildcard metric expression means we cannot reliably determine if this rule depends on any other, + // which means we cannot safely run any rules concurrently. + if n.Name == "" && len(n.LabelMatchers) > 0 { + indeterminate = true + return nil + } + + // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour + // if they run concurrently. + if n.Name == alertMetricName || n.Name == alertForStateMetricName { + indeterminate = true + return nil + } + + inputs[n.Name] = append(inputs[n.Name], rule) + } + return nil + }) + } + + if indeterminate { + return nil + } + + for output, outRules := range outputs { + for _, outRule := range outRules { + if inRules, found := inputs[output]; found && len(inRules) > 0 { + dependencies[outRule] = append(dependencies[outRule], inRules...) + } + } + } + + return dependencies +} diff --git a/rules/manager.go b/rules/manager.go index ed4d42ebad..477508dc04 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" + "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -103,18 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + RuleConcurrencyController RuleConcurrencyController Metrics *Metrics } @@ -130,6 +134,14 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } + if o.RuleConcurrencyController == nil { + if o.ConcurrentEvalsEnabled { + o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) + } else { + o.RuleConcurrencyController = sequentialRuleEvalController{} + } + } + m := &Manager{ groups: map[string]*Group{}, opts: o, @@ -176,6 +188,8 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() + m.opts.RuleConcurrencyController.Invalidate() + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { @@ -403,3 +417,80 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } } + +// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose is to bound the amount +// of concurrency in rule evaluations to avoid overwhelming the Prometheus server with additional query load and ensure +// the correctness of rules running concurrently. Concurrency is controlled globally, not on a per-group basis. +type RuleConcurrencyController interface { + // RuleEligible determines if the rule can guarantee correct results while running concurrently. + RuleEligible(g *Group, r Rule) bool + + // Allow determines whether any concurrent evaluation slots are available. + // If Allow() returns true, then Done() must be called to release the acquired slot. + Allow() bool + + // Done releases a concurrent evaluation slot. + Done() + + // Invalidate instructs the controller to invalidate its state. + // This should be called when groups are modified (during a reload, for instance), because the controller may + // store some state about each group in order to more efficiently determine rule eligibility. + Invalidate() +} + +// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. +type concurrentRuleEvalController struct { + sema *semaphore.Weighted + depMapsMu sync.Mutex + depMaps map[*Group]dependencyMap +} + +func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } +} + +func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() + + depMap, found := c.depMaps[g] + if !found { + depMap = buildDependencyMap(g.rules) + c.depMaps[g] = depMap + } + + return depMap.isIndependent(r) +} + +func (c *concurrentRuleEvalController) Allow() bool { + return c.sema.TryAcquire(1) +} + +func (c *concurrentRuleEvalController) Done() { + c.sema.Release(1) +} + +func (c *concurrentRuleEvalController) Invalidate() { + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() + + // Clear out the memoized dependency maps because some or all groups may have been updated. + c.depMaps = map[*Group]dependencyMap{} +} + +// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. +type sequentialRuleEvalController struct{} + +func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool { + return false +} + +func (c sequentialRuleEvalController) Allow() bool { + return false +} + +func (c sequentialRuleEvalController) Done() {} +func (c sequentialRuleEvalController) Invalidate() {} diff --git a/rules/manager_test.go b/rules/manager_test.go index 3feae51de6..07ec06104d 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -19,11 +19,13 @@ import ( "math" "os" "sort" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -674,8 +676,10 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - Appendable: st, + Appendable: st, + RuleConcurrencyController: sequentialRuleEvalController{}, }, + metrics: NewGroupMetrics(nil), } newGroup.CopyState(oldGroup) @@ -1402,3 +1406,616 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { require.Equal(t, expHist, fh) require.Equal(t, chunkenc.ValNone, it.Next()) } + +func TestDependencyMap(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("user:requests:rate1m <= 0") + require.NoError(t, err) + rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + + expr, err = parser.ParseExpr("sum by (user) (rate(requests[5m]))") + require.NoError(t, err) + rule3 := NewRecordingRule("user:requests:rate5m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("increase(user:requests:rate1m[1h])") + require.NoError(t, err) + rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2, rule3, rule4}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + + require.Zero(t, depMap.dependencies(rule)) + require.Equal(t, 2, depMap.dependents(rule)) + require.False(t, depMap.isIndependent(rule)) + + require.Zero(t, depMap.dependents(rule2)) + require.Equal(t, 1, depMap.dependencies(rule2)) + require.False(t, depMap.isIndependent(rule2)) + + require.Zero(t, depMap.dependents(rule3)) + require.Zero(t, depMap.dependencies(rule3)) + require.True(t, depMap.isIndependent(rule3)) + + require.Zero(t, depMap.dependents(rule4)) + require.Equal(t, 1, depMap.dependencies(rule4)) + require.False(t, depMap.isIndependent(rule4)) +} + +func TestNoDependency(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A group with only one rule cannot have dependencies. + require.Empty(t, depMap) +} + +func TestDependenciesEdgeCases(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + t.Run("empty group", func(t *testing.T) { + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{}, // empty group + Opts: opts, + }) + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + depMap := buildDependencyMap(group.rules) + // A group with no rules has no dependency map, but doesn't panic if the map is queried. + require.Empty(t, depMap) + require.True(t, depMap.isIndependent(rule)) + }) + + t.Run("rules which reference no series", func(t *testing.T) { + expr, err := parser.ParseExpr("one") + require.NoError(t, err) + rule1 := NewRecordingRule("1", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("two") + require.NoError(t, err) + rule2 := NewRecordingRule("2", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A group with rules which reference no series will still produce a dependency map + require.True(t, depMap.isIndependent(rule1)) + require.True(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule with regexp matcher on metric name", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum({__name__=~".+"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with regexp matcher on metric name causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule with not equal matcher on metric name", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum({__name__!="requests", service="app"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with not equal matcher on metric name causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule with not regexp matcher on metric name", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum({__name__!~"requests.+", service="app"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with not regexp matcher on metric name causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule querying ALERTS metric", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum(ALERTS{alertname="test"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule querying ALERTS metric causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule querying ALERTS_FOR_STATE metric", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum(ALERTS_FOR_STATE{alertname="test"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule querying ALERTS_FOR_STATE metric causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) +} + +func TestNoMetricSelector(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`count({user="bob"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore + // all rules are not considered independent. + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) +} + +func TestDependentRulesWithNonMetricExpression(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("user:requests:rate1m <= 0") + require.NoError(t, err) + rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + + expr, err = parser.ParseExpr("3") + require.NoError(t, err) + rule3 := NewRecordingRule("three", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2, rule3}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule3)) +} + +func TestRulesDependentOnMetaMetrics(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + // This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by + // the rule engine, and is therefore not independent. + expr, err := parser.ParseExpr("count(ALERTS)") + require.NoError(t, err) + rule := NewRecordingRule("alert_count", expr, labels.Labels{}) + + // Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules). + expr, err = parser.ParseExpr("1") + require.NoError(t, err) + rule2 := NewRecordingRule("one", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) +} + +func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { + files := []string{"fixtures/rules.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + + ruleManager.start() + defer ruleManager.Stop() + + err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + require.NotEmpty(t, ruleManager.groups, "expected non-empty rule groups") + + orig := make(map[string]dependencyMap, len(ruleManager.groups)) + for _, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) + // No dependency map is expected because there is only one rule in the group. + require.Empty(t, depMap) + orig[g.Name()] = depMap + } + + // Update once without changing groups. + err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + for h, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) + // Dependency maps are the same because of no updates. + if orig[h] == nil { + require.Empty(t, orig[h]) + require.Empty(t, depMap) + } else { + require.Equal(t, orig[h], depMap) + } + + } + + // Groups will be recreated when updated. + files[0] = "fixtures/rules_dependencies.yaml" + err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + + for h, g := range ruleManager.groups { + const ruleName = "job:http_requests:rate5m" + var rr *RecordingRule + + for _, r := range g.rules { + if r.Name() == ruleName { + rr = r.(*RecordingRule) + } + } + + require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + + depMap := buildDependencyMap(g.rules) + // Dependency maps must change because the groups would've been updated. + require.NotEqual(t, orig[h], depMap) + // We expect there to be some dependencies since the new rule group contains a dependency. + require.NotEmpty(t, depMap) + require.Equal(t, 1, depMap.dependents(rr)) + require.Zero(t, depMap.dependencies(rr)) + } +} + +func TestAsyncRuleEvaluation(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + ) + + t.Run("synchronous evaluation with independent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + ruleCount := 4 + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + + t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + + t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 6 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + + } + }) + + t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 6 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) +} + +func TestBoundedRuleEvalConcurrency(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + maxConcurrency int64 = 3 + groupCount = 2 + ) + + files := []string{"fixtures/rules_multiple_groups.yaml"} + + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, maxConcurrency)) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, groupCount) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Evaluate groups concurrently (like they normally do). + var wg sync.WaitGroup + for _, group := range groups { + group := group + + wg.Add(1) + go func() { + group.Eval(ctx, time.Now()) + wg.Done() + }() + } + + wg.Wait() + + // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations. + require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount)) +} + +const artificialDelay = 10 * time.Millisecond + +func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions { + var inflightMu sync.Mutex + + concurrent := maxConcurrent > 0 + + return &ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + ConcurrentEvalsEnabled: concurrent, + MaxConcurrentEvals: maxConcurrent, + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightMu.Lock() + + current := inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + highWatermark := maxInflight.Load() + + if current > highWatermark { + maxInflight.Store(current) + } + inflightMu.Unlock() + + // Artificially delay all query executions to highlight concurrent execution improvement. + time.Sleep(artificialDelay) + + // Return a stub sample. + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + } +}