From 940f83a54037dbdc2ac4059bf562518b9f0788df Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 22:31:26 +0200 Subject: [PATCH 01/17] Implementation NOTE: Rebased from main after refactor in #13014 Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 27 +- docs/command-line/prometheus.md | 1 + rules/fixtures/rules_dependencies.yaml | 7 + rules/fixtures/rules_multiple.yaml | 14 + rules/fixtures/rules_multiple_groups.yaml | 28 ++ rules/group.go | 135 +++++++- rules/manager.go | 29 +- rules/manager_test.go | 382 ++++++++++++++++++++++ 8 files changed, 595 insertions(+), 28 deletions(-) create mode 100644 rules/fixtures/rules_dependencies.yaml create mode 100644 rules/fixtures/rules_multiple.yaml create mode 100644 rules/fixtures/rules_multiple_groups.yaml diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f7244646e2..b8b90ffbe7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -137,6 +137,7 @@ type flagConfig struct { forGracePeriod model.Duration outageTolerance model.Duration resendDelay model.Duration + maxConcurrentEvals int64 web web.Options scrape scrape.Options tsdb tsdbOptions @@ -411,6 +412,9 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) + serverOnlyFlag(a, "rules.max-concurrent-rule-evals", "Global concurrency limit for independent rules which can run concurrently."). + Default("4").Int64Var(&cfg.maxConcurrentEvals) + a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps) @@ -749,17 +753,18 @@ func main() { queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - Queryable: localStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - OutageTolerance: time.Duration(cfg.outageTolerance), - ForGracePeriod: time.Duration(cfg.forGracePeriod), - ResendDelay: time.Duration(cfg.resendDelay), + Appendable: fanoutStorage, + Queryable: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), + ResendDelay: time.Duration(cfg.resendDelay), + MaxConcurrentEvals: cfg.maxConcurrentEvals, }) } diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 747457de1d..de3baa1071 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -48,6 +48,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | +| --rules.max-concurrent-rule-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | diff --git a/rules/fixtures/rules_dependencies.yaml b/rules/fixtures/rules_dependencies.yaml new file mode 100644 index 0000000000..31d2c61763 --- /dev/null +++ b/rules/fixtures/rules_dependencies.yaml @@ -0,0 +1,7 @@ +groups: + - name: test + rules: + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: HighRequestRate + expr: job:http_requests:rate5m > 100 diff --git a/rules/fixtures/rules_multiple.yaml b/rules/fixtures/rules_multiple.yaml new file mode 100644 index 0000000000..db57bede1b --- /dev/null +++ b/rules/fixtures/rules_multiple.yaml @@ -0,0 +1,14 @@ +groups: + - name: test + rules: + # independents + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + # dependents + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: TooManyRequests + expr: job:http_requests:rate15m > 100 diff --git a/rules/fixtures/rules_multiple_groups.yaml b/rules/fixtures/rules_multiple_groups.yaml new file mode 100644 index 0000000000..87f31a6ca5 --- /dev/null +++ b/rules/fixtures/rules_multiple_groups.yaml @@ -0,0 +1,28 @@ +groups: + - name: http + rules: + # independents + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + + # dependents + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: TooManyHTTPRequests + expr: job:http_requests:rate15m > 100 + + - name: grpc + rules: + # independents + - record: job:grpc_requests:rate1m + expr: sum by (job)(rate(grpc_requests_total[1m])) + - record: job:grpc_requests:rate5m + expr: sum by (job)(rate(grpc_requests_total[5m])) + + # dependents + - record: job:grpc_requests:rate15m + expr: sum by (job)(rate(grpc_requests_total[15m])) + - record: TooManyGRPCRequests + expr: job:grpc_requests:rate15m > 100 diff --git a/rules/group.go b/rules/group.go index 55673452e5..c742820a81 100644 --- a/rules/group.go +++ b/rules/group.go @@ -21,8 +21,11 @@ import ( "sync" "time" + "go.uber.org/atomic" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/promql/parser" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -68,6 +71,7 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc + dependencyMap dependencyMap } // GroupEvalIterationFunc is used to implement and extend rule group @@ -126,6 +130,7 @@ func NewGroup(o GroupOptions) *Group { logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, + dependencyMap: buildDependencyMap(o.Rules), } } @@ -421,7 +426,7 @@ func (g *Group) CopyState(from *Group) { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal float64 + var samplesTotal atomic.Float64 for i, rule := range g.rules { select { case <-g.done: @@ -429,7 +434,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { default: } - func(i int, rule Rule) { + eval := func(i int, rule Rule, async bool) { + if async { + defer func() { + g.opts.ConcurrentEvalSema.Release(1) + }() + } logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) @@ -465,7 +475,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } rule.SetHealth(HealthGood) rule.SetLastError(nil) - samplesTotal += float64(len(vector)) + samplesTotal.Add(float64(len(vector))) if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) @@ -554,10 +564,19 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - }(i, rule) + } + + // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. + // Try run concurrently if there are slots available. + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) { + go eval(i, rule, true) + } else { + eval(i, rule, false) + } } + if g.metrics != nil { - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) } g.cleanupStaleSeries(ctx, ts) } @@ -866,3 +885,109 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { return m } + +// dependencyMap is a data-structure which contains the relationships between rules within a group. +// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the +// output metric produced by another recording rule in its expression (i.e. as its "input"). +type dependencyMap map[Rule][]Rule + +// dependents returns all rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) []Rule { + if len(m) == 0 { + return nil + } + + return m[r] +} + +// dependencies returns all the rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) []Rule { + if len(m) == 0 { + return nil + } + + var parents []Rule + for parent, children := range m { + if len(children) == 0 { + continue + } + + for _, child := range children { + if child == r { + parents = append(parents, parent) + } + } + } + + return parents +} + +func (m dependencyMap) isIndependent(r Rule) bool { + if m == nil { + return false + } + + return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0 +} + +// buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +func buildDependencyMap(rules []Rule) dependencyMap { + dependencies := make(dependencyMap) + + if len(rules) <= 1 { + // No relationships if group has 1 or fewer rules. + return nil + } + + inputs := make(map[string][]Rule, len(rules)) + outputs := make(map[string][]Rule, len(rules)) + + var indeterminate bool + + for _, rule := range rules { + rule := rule + + name := rule.Name() + outputs[name] = append(outputs[name], rule) + + parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error { + if n, ok := node.(*parser.VectorSelector); ok { + // A wildcard metric expression means we cannot reliably determine if this rule depends on any other, + // which means we cannot safely run any rules concurrently. + if n.Name == "" && len(n.LabelMatchers) > 0 { + indeterminate = true + return nil + } + + // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour + // if they run concurrently. + if n.Name == alertMetricName || n.Name == alertForStateMetricName { + indeterminate = true + return nil + } + + inputs[n.Name] = append(inputs[n.Name], rule) + } + return nil + }) + } + + if indeterminate { + return nil + } + + if len(inputs) == 0 || len(outputs) == 0 { + // No relationships can be inferred. + return nil + } + + for output, outRules := range outputs { + for _, outRule := range outRules { + if rs, found := inputs[output]; found && len(rs) > 0 { + dependencies[outRule] = append(dependencies[outRule], rs...) + } + } + } + + return dependencies +} diff --git a/rules/manager.go b/rules/manager.go index ed4d42ebad..e9fa94e9e2 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" + "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -103,18 +104,20 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + MaxConcurrentEvals int64 + ConcurrentEvalSema *semaphore.Weighted + GroupLoader GroupLoader Metrics *Metrics } @@ -130,6 +133,8 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } + o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals) + m := &Manager{ groups: map[string]*Group{}, opts: o, diff --git a/rules/manager_test.go b/rules/manager_test.go index 3feae51de6..e3e156038e 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -19,15 +19,18 @@ import ( "math" "os" "sort" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" + "golang.org/x/sync/semaphore" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -1402,3 +1405,382 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { require.Equal(t, expHist, fh) require.Equal(t, chunkenc.ValNone, it.Next()) } + +func TestDependencyMap(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("user:requests:rate1m <= 0") + require.NoError(t, err) + rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + require.Equal(t, []Rule{rule2}, group.dependencyMap.dependents(rule)) + require.Len(t, group.dependencyMap.dependencies(rule), 0) + require.False(t, group.dependencyMap.isIndependent(rule)) + + require.Len(t, group.dependencyMap.dependents(rule2), 0) + require.Equal(t, []Rule{rule}, group.dependencyMap.dependencies(rule2)) + require.False(t, group.dependencyMap.isIndependent(rule2)) +} + +func TestNoDependency(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule}, + Opts: opts, + }) + + // A group with only one rule cannot have dependencies. + require.False(t, group.dependencyMap.isIndependent(rule)) +} + +func TestNoMetricSelector(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`count({user="bob"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore + // all rules are not considered independent. + require.False(t, group.dependencyMap.isIndependent(rule)) + require.False(t, group.dependencyMap.isIndependent(rule2)) +} + +func TestDependentRulesWithNonMetricExpression(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("user:requests:rate1m <= 0") + require.NoError(t, err) + rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + + expr, err = parser.ParseExpr("3") + require.NoError(t, err) + rule3 := NewRecordingRule("three", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2, rule3}, + Opts: opts, + }) + + require.False(t, group.dependencyMap.isIndependent(rule)) + require.False(t, group.dependencyMap.isIndependent(rule2)) + require.True(t, group.dependencyMap.isIndependent(rule3)) +} + +func TestRulesDependentOnMetaMetrics(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + // This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by + // the rule engine, and is therefore not independent. + expr, err := parser.ParseExpr("count(ALERTS)") + require.NoError(t, err) + rule := NewRecordingRule("alert_count", expr, labels.Labels{}) + + // Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules). + expr, err = parser.ParseExpr("1") + require.NoError(t, err) + rule2 := NewRecordingRule("one", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule, rule2}, + Opts: opts, + }) + + require.False(t, group.dependencyMap.isIndependent(rule)) +} + +func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { + files := []string{"fixtures/rules.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + }) + + ruleManager.start() + defer ruleManager.Stop() + + err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups") + + orig := make(map[string]dependencyMap, len(ruleManager.groups)) + for _, g := range ruleManager.groups { + // No dependency map is expected because there is only one rule in the group. + require.Empty(t, g.dependencyMap) + orig[g.Name()] = g.dependencyMap + } + + // Update once without changing groups. + err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + for h, g := range ruleManager.groups { + // Dependency maps are the same because of no updates. + require.Equal(t, orig[h], g.dependencyMap) + } + + // Groups will be recreated when updated. + files[0] = "fixtures/rules_dependencies.yaml" + err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) + require.NoError(t, err) + + for h, g := range ruleManager.groups { + // Dependency maps must change because the groups would've been updated. + require.NotEqual(t, orig[h], g.dependencyMap) + // We expect there to be some dependencies since the new rule group contains a dependency. + require.Greater(t, len(g.dependencyMap), 0) + } +} + +func TestAsyncRuleEvaluation(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + const artificialDelay = time.Second + + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + ) + + files := []string{"fixtures/rules_multiple.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + // Artificially delay all query executions to highly concurrent execution improvement. + time.Sleep(artificialDelay) + + // return a stub sample + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + }) + + // Evaluate groups manually to show the impact of async rule evaluations. + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + expectedRules := 4 + + t.Run("synchronous evaluation with independent rules", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + for _, group := range groups { + require.Len(t, group.rules, expectedRules) + + start := time.Now() + + // Never expect more than 1 inflight query at a time. + go func() { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + }() + + group.Eval(ctx, start) + + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + } + + cancel() + }) + + t.Run("asynchronous evaluation with independent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + ctx, cancel := context.WithCancel(context.Background()) + + for _, group := range groups { + // Allow up to 2 concurrent rule evaluations. + group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2) + require.Len(t, group.rules, expectedRules) + + start := time.Now() + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + }() + + group.Eval(ctx, start) + + require.EqualValues(t, 3, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + } + + cancel() + }) +} + +func TestBoundedRuleEvalConcurrency(t *testing.T) { + storage := teststorage.New(t) + t.Cleanup(func() { storage.Close() }) + + const artificialDelay = time.Millisecond * 100 + + var ( + inflightQueries atomic.Int32 + maxInflight atomic.Int32 + maxConcurrency int64 = 3 + groupCount = 2 + ) + + files := []string{"fixtures/rules_multiple_groups.yaml"} + ruleManager := NewManager(&ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + MaxConcurrentEvals: maxConcurrency, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + // Artificially delay all query executions to highly concurrent execution improvement. + time.Sleep(artificialDelay) + + // return a stub sample + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + }) + + // Evaluate groups manually to show the impact of async rule evaluations. + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, groupCount) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + }() + + // Evaluate groups concurrently (like they normally do). + var wg sync.WaitGroup + for _, group := range groups { + group := group + + wg.Add(1) + go func() { + group.Eval(ctx, time.Now()) + wg.Done() + }() + } + + wg.Wait() + cancel() + + // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations. + require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount)) +} From ed2933ca60603c0f2eacb03726090493d641c695 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 23:05:01 +0200 Subject: [PATCH 02/17] Add feature flag Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 4 ++++ docs/feature_flags.md | 10 ++++++++++ 2 files changed, 14 insertions(+) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b8b90ffbe7..beb9e3b0ad 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -158,6 +158,7 @@ type flagConfig struct { enablePerStepStats bool enableAutoGOMAXPROCS bool enableAutoGOMEMLIMIT bool + enableConcurrentRuleEval bool prometheusURL string corsRegexString string @@ -204,6 +205,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "auto-gomemlimit": c.enableAutoGOMEMLIMIT = true level.Info(logger).Log("msg", "Automatically set GOMEMLIMIT to match Linux container or system memory limit") + case "concurrent-rule-eval": + c.enableConcurrentRuleEval = true + level.Info(logger).Log("msg", "Experimental concurrent rule evaluation enabled.") case "no-default-scrape-port": c.scrape.NoDefaultPort = true level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") diff --git a/docs/feature_flags.md b/docs/feature_flags.md index adefaad4b0..5517018df4 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -212,3 +212,13 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details). Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped. + +## Concurrent evaluation of independent rules + +`--enable-feature=concurrent-rule-eval` + +Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the +output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no +reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query +load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set +to `4` by default. From e7758d187e93817f86e3d9adf78dcaf6c79c15c0 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 23:05:25 +0200 Subject: [PATCH 03/17] Refactor concurrency control Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 25 ++++++++++--------- rules/group.go | 13 +++++----- rules/manager.go | 56 +++++++++++++++++++++++++++++++----------- rules/manager_test.go | 12 ++++----- 4 files changed, 67 insertions(+), 39 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index beb9e3b0ad..8dd1d88fa0 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -757,18 +757,19 @@ func main() { queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - Queryable: localStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), - NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), - Context: ctxRule, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - OutageTolerance: time.Duration(cfg.outageTolerance), - ForGracePeriod: time.Duration(cfg.forGracePeriod), - ResendDelay: time.Duration(cfg.resendDelay), - MaxConcurrentEvals: cfg.maxConcurrentEvals, + Appendable: fanoutStorage, + Queryable: localStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), + NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + OutageTolerance: time.Duration(cfg.outageTolerance), + ForGracePeriod: time.Duration(cfg.forGracePeriod), + ResendDelay: time.Duration(cfg.resendDelay), + MaxConcurrentEvals: cfg.maxConcurrentEvals, + ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, }) } diff --git a/rules/group.go b/rules/group.go index c742820a81..2be41c8015 100644 --- a/rules/group.go +++ b/rules/group.go @@ -435,11 +435,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } eval := func(i int, rule Rule, async bool) { - if async { - defer func() { - g.opts.ConcurrentEvalSema.Release(1) - }() - } + defer func() { + if async { + g.opts.ConcurrencyController.Done() + } + }() + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) ctx, sp := otel.Tracer("").Start(ctx, "rule") sp.SetAttributes(attribute.String("name", rule.Name())) @@ -568,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalSema != nil && g.opts.ConcurrentEvalSema.TryAcquire(1) { + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) diff --git a/rules/manager.go b/rules/manager.go index e9fa94e9e2..0aeeae1703 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -104,20 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalSema *semaphore.Weighted - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrencyController ConcurrencyController + GroupLoader GroupLoader Metrics *Metrics } @@ -133,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrentEvalSema = semaphore.NewWeighted(o.MaxConcurrentEvals) + o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) m := &Manager{ groups: map[string]*Group{}, @@ -408,3 +409,28 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } } + +type ConcurrencyController struct { + enabled bool + sema *semaphore.Weighted +} + +func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController { + return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +} + +func (c ConcurrencyController) Allow() bool { + if !c.enabled { + return false + } + + return c.sema.TryAcquire(1) +} + +func (c ConcurrencyController) Done() { + if !c.enabled { + return + } + + c.sema.Release(1) +} diff --git a/rules/manager_test.go b/rules/manager_test.go index e3e156038e..c2b23716f1 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" - "golang.org/x/sync/semaphore" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/model/labels" @@ -1672,7 +1671,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { for _, group := range groups { // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrentEvalSema = semaphore.NewWeighted(2) + group.opts.ConcurrencyController = NewConcurrencyController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() @@ -1722,10 +1721,11 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { files := []string{"fixtures/rules_multiple_groups.yaml"} ruleManager := NewManager(&ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - MaxConcurrentEvals: maxConcurrency, + Context: context.Background(), + Logger: log.NewNopLogger(), + Appendable: storage, + ConcurrentEvalsEnabled: true, + MaxConcurrentEvals: maxConcurrency, QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { inflightQueries.Add(1) defer func() { From 0dc7036db3f9747a3b6ee43278f67f6591400ec0 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 11:25:12 +0200 Subject: [PATCH 04/17] Optimising dependencies/dependents funcs to not produce new slices each request Signed-off-by: Danny Kopping --- rules/group.go | 26 +++++++++++--------------- rules/manager_test.go | 27 ++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/rules/group.go b/rules/group.go index 2be41c8015..568d606b58 100644 --- a/rules/group.go +++ b/rules/group.go @@ -892,35 +892,31 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { // output metric produced by another recording rule in its expression (i.e. as its "input"). type dependencyMap map[Rule][]Rule -// dependents returns all rules which use the output of the given rule as one of their inputs. -func (m dependencyMap) dependents(r Rule) []Rule { - if len(m) == 0 { - return nil - } - - return m[r] +// dependents returns the count of rules which use the output of the given rule as one of their inputs. +func (m dependencyMap) dependents(r Rule) int { + return len(m[r]) } -// dependencies returns all the rules on which the given rule is dependent for input. -func (m dependencyMap) dependencies(r Rule) []Rule { +// dependencies returns the count of rules on which the given rule is dependent for input. +func (m dependencyMap) dependencies(r Rule) int { if len(m) == 0 { - return nil + return 0 } - var parents []Rule - for parent, children := range m { + var count int + for _, children := range m { if len(children) == 0 { continue } for _, child := range children { if child == r { - parents = append(parents, parent) + count++ } } } - return parents + return count } func (m dependencyMap) isIndependent(r Rule) bool { @@ -928,7 +924,7 @@ func (m dependencyMap) isIndependent(r Rule) bool { return false } - return len(m.dependents(r)) == 0 && len(m.dependencies(r)) == 0 + return m.dependents(r)+m.dependencies(r) == 0 } // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. diff --git a/rules/manager_test.go b/rules/manager_test.go index c2b23716f1..2a9b3a1d73 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1419,20 +1419,37 @@ func TestDependencyMap(t *testing.T) { expr, err = parser.ParseExpr("user:requests:rate1m <= 0") require.NoError(t, err) rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger()) + + expr, err = parser.ParseExpr("sum by (user) (rate(requests[5m]))") + require.NoError(t, err) + rule3 := NewRecordingRule("user:requests:rate5m", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("increase(user:requests:rate1m[1h])") + require.NoError(t, err) + rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{}) + group := NewGroup(GroupOptions{ Name: "rule_group", Interval: time.Second, - Rules: []Rule{rule, rule2}, + Rules: []Rule{rule, rule2, rule3, rule4}, Opts: opts, }) - require.Equal(t, []Rule{rule2}, group.dependencyMap.dependents(rule)) - require.Len(t, group.dependencyMap.dependencies(rule), 0) + require.Zero(t, group.dependencyMap.dependencies(rule)) + require.Equal(t, 2, group.dependencyMap.dependents(rule)) require.False(t, group.dependencyMap.isIndependent(rule)) - require.Len(t, group.dependencyMap.dependents(rule2), 0) - require.Equal(t, []Rule{rule}, group.dependencyMap.dependencies(rule2)) + require.Zero(t, group.dependencyMap.dependents(rule2)) + require.Equal(t, 1, group.dependencyMap.dependencies(rule2)) require.False(t, group.dependencyMap.isIndependent(rule2)) + + require.Zero(t, group.dependencyMap.dependents(rule3)) + require.Zero(t, group.dependencyMap.dependencies(rule3)) + require.True(t, group.dependencyMap.isIndependent(rule3)) + + require.Zero(t, group.dependencyMap.dependents(rule4)) + require.Equal(t, 1, group.dependencyMap.dependencies(rule4)) + require.False(t, group.dependencyMap.isIndependent(rule4)) } func TestNoDependency(t *testing.T) { From 94cdfa30cd724842c2971b6b01ed520041ac119e Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 11:44:20 +0200 Subject: [PATCH 05/17] Refactoring Signed-off-by: Danny Kopping --- rules/group.go | 24 +++++++++----- rules/manager.go | 53 ++++++++++++++++++------------- rules/manager_test.go | 73 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 37 deletions(-) diff --git a/rules/group.go b/rules/group.go index 568d606b58..7f53e1b474 100644 --- a/rules/group.go +++ b/rules/group.go @@ -437,7 +437,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { - g.opts.ConcurrencyController.Done() + g.opts.ConcurrentEvalsController.Done() } }() @@ -569,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() { + if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) @@ -888,8 +888,8 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics { } // dependencyMap is a data-structure which contains the relationships between rules within a group. -// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the -// output metric produced by another recording rule in its expression (i.e. as its "input"). +// It is used to describe the dependency associations between rules in a group whereby one rule uses the +// output metric produced by another rule in its expression (i.e. as its "input"). type dependencyMap map[Rule][]Rule // dependents returns the count of rules which use the output of the given rule as one of their inputs. @@ -905,10 +905,6 @@ func (m dependencyMap) dependencies(r Rule) int { var count int for _, children := range m { - if len(children) == 0 { - continue - } - for _, child := range children { if child == r { count++ @@ -919,6 +915,8 @@ func (m dependencyMap) dependencies(r Rule) int { return count } +// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule +// dependent on its output. func (m dependencyMap) isIndependent(r Rule) bool { if m == nil { return false @@ -928,6 +926,16 @@ func (m dependencyMap) isIndependent(r Rule) bool { } // buildDependencyMap builds a data-structure which contains the relationships between rules within a group. +// +// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose +// output an Alert rule depends will not be able to run concurrently. +// +// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be +// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour: +// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector +// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE +// +// Rules which are independent can run concurrently with no side-effects. func buildDependencyMap(rules []Rule) dependencyMap { dependencies := make(dependencyMap) diff --git a/rules/manager.go b/rules/manager.go index 0aeeae1703..b982f23754 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -104,21 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryFunc QueryFunc - NotifyFunc NotifyFunc - Context context.Context - Appendable storage.Appendable - Queryable storage.Queryable - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalsEnabled bool - ConcurrencyController ConcurrencyController - GroupLoader GroupLoader + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrentEvalsController ConcurrentRuleEvalController Metrics *Metrics } @@ -134,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) m := &Manager{ groups: map[string]*Group{}, @@ -410,16 +410,26 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -type ConcurrencyController struct { +// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount +// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. +// Concurrency is controlled globally, not on a per-group basis. +type ConcurrentRuleEvalController interface { + Allow() bool + Done() +} + +// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. +type concurrentRuleEvalController struct { enabled bool sema *semaphore.Weighted } -func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController { - return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { + return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} } -func (c ConcurrencyController) Allow() bool { +// Allow determines whether any concurrency slots are available. +func (c concurrentRuleEvalController) Allow() bool { if !c.enabled { return false } @@ -427,7 +437,8 @@ func (c ConcurrencyController) Allow() bool { return c.sema.TryAcquire(1) } -func (c ConcurrencyController) Done() { +// Done releases a concurrent evaluation slot. +func (c concurrentRuleEvalController) Done() { if !c.enabled { return } diff --git a/rules/manager_test.go b/rules/manager_test.go index 2a9b3a1d73..8b3b9c08ff 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1471,7 +1471,53 @@ func TestNoDependency(t *testing.T) { }) // A group with only one rule cannot have dependencies. - require.False(t, group.dependencyMap.isIndependent(rule)) + require.Empty(t, group.dependencyMap) +} + +func TestDependenciesEdgeCases(t *testing.T) { + ctx := context.Background() + opts := &ManagerOptions{ + Context: ctx, + Logger: log.NewNopLogger(), + } + + t.Run("empty group", func(t *testing.T) { + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{}, // empty group + Opts: opts, + }) + + expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))") + require.NoError(t, err) + rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + + // A group with no rules has no dependency map, but doesn't panic if the map is queried. + require.Nil(t, group.dependencyMap) + require.False(t, group.dependencyMap.isIndependent(rule)) + }) + + t.Run("rules which reference no series", func(t *testing.T) { + expr, err := parser.ParseExpr("one") + require.NoError(t, err) + rule1 := NewRecordingRule("1", expr, labels.Labels{}) + + expr, err = parser.ParseExpr("two") + require.NoError(t, err) + rule2 := NewRecordingRule("2", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + // A group with rules which reference no series will still produce a dependency map + require.True(t, group.dependencyMap.isIndependent(rule1)) + require.True(t, group.dependencyMap.isIndependent(rule2)) + }) } func TestNoMetricSelector(t *testing.T) { @@ -1596,10 +1642,23 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { require.NoError(t, err) for h, g := range ruleManager.groups { + const ruleName = "job:http_requests:rate5m" + var rr *RecordingRule + + for _, r := range g.rules { + if r.Name() == ruleName { + rr = r.(*RecordingRule) + } + } + + require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + // Dependency maps must change because the groups would've been updated. require.NotEqual(t, orig[h], g.dependencyMap) // We expect there to be some dependencies since the new rule group contains a dependency. require.Greater(t, len(g.dependencyMap), 0) + require.Equal(t, 1, g.dependencyMap.dependents(rr)) + require.Zero(t, g.dependencyMap.dependencies(rr)) } } @@ -1625,17 +1684,16 @@ func TestAsyncRuleEvaluation(t *testing.T) { inflightQueries.Add(-1) }() - // Artificially delay all query executions to highly concurrent execution improvement. + // Artificially delay all query executions to highlight concurrent execution improvement. time.Sleep(artificialDelay) - // return a stub sample + // Return a stub sample. return promql.Vector{ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, }) - // Evaluate groups manually to show the impact of async rule evaluations. groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, 1) @@ -1688,7 +1746,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { for _, group := range groups { // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrencyController = NewConcurrencyController(true, 2) + group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() @@ -1749,17 +1807,16 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { inflightQueries.Add(-1) }() - // Artificially delay all query executions to highly concurrent execution improvement. + // Artificially delay all query executions to highlight concurrent execution improvement. time.Sleep(artificialDelay) - // return a stub sample + // Return a stub sample. return promql.Vector{ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, }) - // Evaluate groups manually to show the impact of async rule evaluations. groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, groupCount) From 5bda33375a58b8d7b52195241136f1152f14114c Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Sat, 28 Oct 2023 13:25:06 +0200 Subject: [PATCH 06/17] Rename flag Signed-off-by: Danny Kopping --- cmd/prometheus/main.go | 2 +- docs/command-line/prometheus.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 8dd1d88fa0..830ae46490 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -416,7 +416,7 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) - serverOnlyFlag(a, "rules.max-concurrent-rule-evals", "Global concurrency limit for independent rules which can run concurrently."). + serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules which can run concurrently."). Default("4").Int64Var(&cfg.maxConcurrentEvals) a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index de3baa1071..fef8ffa54d 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -48,7 +48,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | -| --rules.max-concurrent-rule-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | +| --rules.max-concurrent-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | From f922534c4df77a35ee6c73bdf153ca89ca4da1a8 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 2 Nov 2023 20:33:06 +0200 Subject: [PATCH 07/17] Refactoring for performance, and to allow controller to be overridden Signed-off-by: Danny Kopping --- rules/group.go | 7 +- rules/manager.go | 64 ++++++++++++++---- rules/manager_test.go | 148 ++++++++++++++++++++++-------------------- 3 files changed, 134 insertions(+), 85 deletions(-) diff --git a/rules/group.go b/rules/group.go index 7f53e1b474..8de0900d1a 100644 --- a/rules/group.go +++ b/rules/group.go @@ -71,7 +71,6 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc - dependencyMap dependencyMap } // GroupEvalIterationFunc is used to implement and extend rule group @@ -130,7 +129,6 @@ func NewGroup(o GroupOptions) *Group { logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, - dependencyMap: buildDependencyMap(o.Rules), } } @@ -437,7 +435,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { - g.opts.ConcurrentEvalsController.Done() + g.opts.RuleConcurrencyController.Done() } }() @@ -569,7 +567,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { + ctrl := g.opts.RuleConcurrencyController + if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) diff --git a/rules/manager.go b/rules/manager.go index b982f23754..9ac95cdbd3 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -118,7 +118,7 @@ type ManagerOptions struct { GroupLoader GroupLoader MaxConcurrentEvals int64 ConcurrentEvalsEnabled bool - ConcurrentEvalsController ConcurrentRuleEvalController + RuleConcurrencyController RuleConcurrencyController Metrics *Metrics } @@ -134,7 +134,9 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + if o.RuleConcurrencyController == nil { + o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + } m := &Manager{ groups: map[string]*Group{}, @@ -182,6 +184,10 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() + if m.opts.RuleConcurrencyController != nil { + m.opts.RuleConcurrencyController.Invalidate() + } + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { @@ -410,26 +416,55 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount -// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. +// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount +// of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load. // Concurrency is controlled globally, not on a per-group basis. -type ConcurrentRuleEvalController interface { +type RuleConcurrencyController interface { + // RuleEligible determines if a rule can be run concurrently. + RuleEligible(g *Group, r Rule) bool + + // Allow determines whether any concurrent evaluation slots are available. Allow() bool + + // Done releases a concurrent evaluation slot. Done() + + // Invalidate instructs the controller to invalidate its state. + // This should be called when groups are modified (during a reload, for instance), because the controller may + // store some state about each group in order to more efficiently determine rule eligibility. + Invalidate() +} + +func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + enabled: enabled, + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { + mu sync.Mutex enabled bool sema *semaphore.Weighted + depMaps map[*Group]dependencyMap } -func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { - return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { + c.mu.Lock() + defer c.mu.Unlock() + + depMap, found := c.depMaps[g] + if !found { + depMap = buildDependencyMap(g.rules) + c.depMaps[g] = depMap + } + + return depMap.isIndependent(r) } -// Allow determines whether any concurrency slots are available. -func (c concurrentRuleEvalController) Allow() bool { +func (c *concurrentRuleEvalController) Allow() bool { if !c.enabled { return false } @@ -437,11 +472,18 @@ func (c concurrentRuleEvalController) Allow() bool { return c.sema.TryAcquire(1) } -// Done releases a concurrent evaluation slot. -func (c concurrentRuleEvalController) Done() { +func (c *concurrentRuleEvalController) Done() { if !c.enabled { return } c.sema.Release(1) } + +func (c *concurrentRuleEvalController) Invalidate() { + c.mu.Lock() + defer c.mu.Unlock() + + // Clear out the memoized dependency maps because some or all groups may have been updated. + c.depMaps = map[*Group]dependencyMap{} +} diff --git a/rules/manager_test.go b/rules/manager_test.go index 8b3b9c08ff..47f0248eb0 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1435,21 +1435,23 @@ func TestDependencyMap(t *testing.T) { Opts: opts, }) - require.Zero(t, group.dependencyMap.dependencies(rule)) - require.Equal(t, 2, group.dependencyMap.dependents(rule)) - require.False(t, group.dependencyMap.isIndependent(rule)) + depMap := buildDependencyMap(group.rules) - require.Zero(t, group.dependencyMap.dependents(rule2)) - require.Equal(t, 1, group.dependencyMap.dependencies(rule2)) - require.False(t, group.dependencyMap.isIndependent(rule2)) + require.Zero(t, depMap.dependencies(rule)) + require.Equal(t, 2, depMap.dependents(rule)) + require.False(t, depMap.isIndependent(rule)) - require.Zero(t, group.dependencyMap.dependents(rule3)) - require.Zero(t, group.dependencyMap.dependencies(rule3)) - require.True(t, group.dependencyMap.isIndependent(rule3)) + require.Zero(t, depMap.dependents(rule2)) + require.Equal(t, 1, depMap.dependencies(rule2)) + require.False(t, depMap.isIndependent(rule2)) - require.Zero(t, group.dependencyMap.dependents(rule4)) - require.Equal(t, 1, group.dependencyMap.dependencies(rule4)) - require.False(t, group.dependencyMap.isIndependent(rule4)) + require.Zero(t, depMap.dependents(rule3)) + require.Zero(t, depMap.dependencies(rule3)) + require.True(t, depMap.isIndependent(rule3)) + + require.Zero(t, depMap.dependents(rule4)) + require.Equal(t, 1, depMap.dependencies(rule4)) + require.False(t, depMap.isIndependent(rule4)) } func TestNoDependency(t *testing.T) { @@ -1470,8 +1472,9 @@ func TestNoDependency(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A group with only one rule cannot have dependencies. - require.Empty(t, group.dependencyMap) + require.Empty(t, depMap) } func TestDependenciesEdgeCases(t *testing.T) { @@ -1493,9 +1496,10 @@ func TestDependenciesEdgeCases(t *testing.T) { require.NoError(t, err) rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + depMap := buildDependencyMap(group.rules) // A group with no rules has no dependency map, but doesn't panic if the map is queried. - require.Nil(t, group.dependencyMap) - require.False(t, group.dependencyMap.isIndependent(rule)) + require.Nil(t, depMap) + require.False(t, depMap.isIndependent(rule)) }) t.Run("rules which reference no series", func(t *testing.T) { @@ -1514,9 +1518,10 @@ func TestDependenciesEdgeCases(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A group with rules which reference no series will still produce a dependency map - require.True(t, group.dependencyMap.isIndependent(rule1)) - require.True(t, group.dependencyMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule1)) + require.True(t, depMap.isIndependent(rule2)) }) } @@ -1542,10 +1547,11 @@ func TestNoMetricSelector(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore // all rules are not considered independent. - require.False(t, group.dependencyMap.isIndependent(rule)) - require.False(t, group.dependencyMap.isIndependent(rule2)) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) } func TestDependentRulesWithNonMetricExpression(t *testing.T) { @@ -1574,9 +1580,10 @@ func TestDependentRulesWithNonMetricExpression(t *testing.T) { Opts: opts, }) - require.False(t, group.dependencyMap.isIndependent(rule)) - require.False(t, group.dependencyMap.isIndependent(rule2)) - require.True(t, group.dependencyMap.isIndependent(rule3)) + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule3)) } func TestRulesDependentOnMetaMetrics(t *testing.T) { @@ -1604,7 +1611,8 @@ func TestRulesDependentOnMetaMetrics(t *testing.T) { Opts: opts, }) - require.False(t, group.dependencyMap.isIndependent(rule)) + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) } func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { @@ -1623,17 +1631,19 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { orig := make(map[string]dependencyMap, len(ruleManager.groups)) for _, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) // No dependency map is expected because there is only one rule in the group. - require.Empty(t, g.dependencyMap) - orig[g.Name()] = g.dependencyMap + require.Empty(t, depMap) + orig[g.Name()] = depMap } // Update once without changing groups. err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) require.NoError(t, err) for h, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) // Dependency maps are the same because of no updates. - require.Equal(t, orig[h], g.dependencyMap) + require.Equal(t, orig[h], depMap) } // Groups will be recreated when updated. @@ -1653,12 +1663,13 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + depMap := buildDependencyMap(g.rules) // Dependency maps must change because the groups would've been updated. - require.NotEqual(t, orig[h], g.dependencyMap) + require.NotEqual(t, orig[h], depMap) // We expect there to be some dependencies since the new rule group contains a dependency. - require.Greater(t, len(g.dependencyMap), 0) - require.Equal(t, 1, g.dependencyMap.dependents(rr)) - require.Zero(t, g.dependencyMap.dependencies(rr)) + require.Greater(t, len(depMap), 0) + require.Equal(t, 1, depMap.dependents(rr)) + require.Zero(t, depMap.dependencies(rr)) } } @@ -1674,7 +1685,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { ) files := []string{"fixtures/rules_multiple.yaml"} - ruleManager := NewManager(&ManagerOptions{ + opts := &ManagerOptions{ Context: context.Background(), Logger: log.NewNopLogger(), Appendable: storage, @@ -1692,39 +1703,42 @@ func TestAsyncRuleEvaluation(t *testing.T) { promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, - }) + } - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) - require.Empty(t, errs) - require.Len(t, groups, 1) + inflightTracker := func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + } expectedRules := 4 t.Run("synchronous evaluation with independent rules", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + ruleManager := NewManager(opts) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + for _, group := range groups { require.Len(t, group.rules, expectedRules) start := time.Now() // Never expect more than 1 inflight query at a time. - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + go inflightTracker(ctx) group.Eval(ctx, start) @@ -1744,33 +1758,27 @@ func TestAsyncRuleEvaluation(t *testing.T) { maxInflight.Store(0) ctx, cancel := context.WithCancel(context.Background()) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + for _, group := range groups { - // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + go inflightTracker(ctx) group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) // Each rule produces one vector. From 7aa3b10c3fb6ae025e0173e439f54f6020a7eec8 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 5 Jan 2024 22:48:30 +0200 Subject: [PATCH 08/17] Block until all rules, both sync & async, have completed evaluating Updated & added tests Review feedback nits Return empty map if not indeterminate Use highWatermark to track inflight requests counter Appease the linter Clarify feature flag Signed-off-by: Danny Kopping --- docs/feature_flags.md | 9 +- .../fixtures/rules_multiple_independent.yaml | 15 + rules/group.go | 25 +- rules/manager.go | 17 +- rules/manager_test.go | 287 ++++++++++-------- 5 files changed, 208 insertions(+), 145 deletions(-) create mode 100644 rules/fixtures/rules_multiple_independent.yaml diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 5517018df4..95b6270104 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -217,8 +217,9 @@ Besides enabling this feature in Prometheus, created timestamps need to be expos `--enable-feature=concurrent-rule-eval` -Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the +By default, rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no -reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query -load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set -to `4` by default. +reason to run them sequentially. +When the `concurrent-rule-eval` feature flag is enabled, rules without any dependency on other rules within a rule group will be evaluated concurrently. +This can improve rule reliability at the expense of adding more concurrent query load. The number of concurrent rule evaluations can be configured +with `--rules.max-concurrent-rule-evals` which is set to `4` by default. diff --git a/rules/fixtures/rules_multiple_independent.yaml b/rules/fixtures/rules_multiple_independent.yaml new file mode 100644 index 0000000000..e071be3eff --- /dev/null +++ b/rules/fixtures/rules_multiple_independent.yaml @@ -0,0 +1,15 @@ +groups: + - name: independents + rules: + - record: job:http_requests:rate1m + expr: sum by (job)(rate(http_requests_total[1m])) + - record: job:http_requests:rate5m + expr: sum by (job)(rate(http_requests_total[5m])) + - record: job:http_requests:rate15m + expr: sum by (job)(rate(http_requests_total[15m])) + - record: job:http_requests:rate30m + expr: sum by (job)(rate(http_requests_total[30m])) + - record: job:http_requests:rate1h + expr: sum by (job)(rate(http_requests_total[1h])) + - record: job:http_requests:rate2h + expr: sum by (job)(rate(http_requests_total[2h])) diff --git a/rules/group.go b/rules/group.go index 8de0900d1a..939d2cd5b6 100644 --- a/rules/group.go +++ b/rules/group.go @@ -423,8 +423,13 @@ func (g *Group) CopyState(from *Group) { } // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. +// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal atomic.Float64 + var ( + samplesTotal atomic.Float64 + wg sync.WaitGroup + ) + for i, rule := range g.rules { select { case <-g.done: @@ -435,6 +440,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { + wg.Done() g.opts.RuleConcurrencyController.Done() } }() @@ -569,12 +575,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // Try run concurrently if there are slots available. ctrl := g.opts.RuleConcurrencyController if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { + wg.Add(1) go eval(i, rule, true) } else { eval(i, rule, false) } } + wg.Wait() if g.metrics != nil { g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) } @@ -940,7 +948,7 @@ func buildDependencyMap(rules []Rule) dependencyMap { if len(rules) <= 1 { // No relationships if group has 1 or fewer rules. - return nil + return dependencies } inputs := make(map[string][]Rule, len(rules)) @@ -949,7 +957,9 @@ func buildDependencyMap(rules []Rule) dependencyMap { var indeterminate bool for _, rule := range rules { - rule := rule + if indeterminate { + break + } name := rule.Name() outputs[name] = append(outputs[name], rule) @@ -980,15 +990,10 @@ func buildDependencyMap(rules []Rule) dependencyMap { return nil } - if len(inputs) == 0 || len(outputs) == 0 { - // No relationships can be inferred. - return nil - } - for output, outRules := range outputs { for _, outRule := range outRules { - if rs, found := inputs[output]; found && len(rs) > 0 { - dependencies[outRule] = append(dependencies[outRule], rs...) + if inRules, found := inputs[output]; found && len(inRules) > 0 { + dependencies[outRule] = append(dependencies[outRule], inRules...) } } } diff --git a/rules/manager.go b/rules/manager.go index 9ac95cdbd3..84b43fba7d 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -424,6 +424,7 @@ type RuleConcurrencyController interface { RuleEligible(g *Group, r Rule) bool // Allow determines whether any concurrent evaluation slots are available. + // If Allow() returns true, then Done() must be called to release the acquired slot. Allow() bool // Done releases a concurrent evaluation slot. @@ -445,15 +446,15 @@ func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcur // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - mu sync.Mutex - enabled bool - sema *semaphore.Weighted - depMaps map[*Group]dependencyMap + enabled bool + sema *semaphore.Weighted + depMapsMu sync.Mutex + depMaps map[*Group]dependencyMap } func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { - c.mu.Lock() - defer c.mu.Unlock() + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() depMap, found := c.depMaps[g] if !found { @@ -481,8 +482,8 @@ func (c *concurrentRuleEvalController) Done() { } func (c *concurrentRuleEvalController) Invalidate() { - c.mu.Lock() - defer c.mu.Unlock() + c.depMapsMu.Lock() + defer c.depMapsMu.Unlock() // Clear out the memoized dependency maps because some or all groups may have been updated. c.depMaps = map[*Group]dependencyMap{} diff --git a/rules/manager_test.go b/rules/manager_test.go index 47f0248eb0..2d1dc6b42e 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1498,8 +1498,8 @@ func TestDependenciesEdgeCases(t *testing.T) { depMap := buildDependencyMap(group.rules) // A group with no rules has no dependency map, but doesn't panic if the map is queried. - require.Nil(t, depMap) - require.False(t, depMap.isIndependent(rule)) + require.Empty(t, depMap) + require.True(t, depMap.isIndependent(rule)) }) t.Run("rules which reference no series", func(t *testing.T) { @@ -1627,7 +1627,7 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) require.NoError(t, err) - require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups") + require.NotEmpty(t, ruleManager.groups, "expected non-empty rule groups") orig := make(map[string]dependencyMap, len(ruleManager.groups)) for _, g := range ruleManager.groups { @@ -1643,7 +1643,13 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { for h, g := range ruleManager.groups { depMap := buildDependencyMap(g.rules) // Dependency maps are the same because of no updates. - require.Equal(t, orig[h], depMap) + if orig[h] == nil { + require.Empty(t, orig[h]) + require.Empty(t, depMap) + } else { + require.Equal(t, orig[h], depMap) + } + } // Groups will be recreated when updated. @@ -1667,7 +1673,7 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { // Dependency maps must change because the groups would've been updated. require.NotEqual(t, orig[h], depMap) // We expect there to be some dependencies since the new rule group contains a dependency. - require.Greater(t, len(depMap), 0) + require.NotEmpty(t, depMap) require.Equal(t, 1, depMap.dependents(rr)) require.Zero(t, depMap.dependencies(rr)) } @@ -1677,86 +1683,51 @@ func TestAsyncRuleEvaluation(t *testing.T) { storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) - const artificialDelay = time.Second - var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 ) - files := []string{"fixtures/rules_multiple.yaml"} - opts := &ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { - inflightQueries.Add(1) - defer func() { - inflightQueries.Add(-1) - }() - - // Artificially delay all query executions to highlight concurrent execution improvement. - time.Sleep(artificialDelay) - - // Return a stub sample. - return promql.Vector{ - promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, - }, nil - }, - } - - inflightTracker := func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - } - - expectedRules := 4 - t.Run("synchronous evaluation with independent rules", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - - ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) - require.Empty(t, errs) - require.Len(t, groups, 1) - - for _, group := range groups { - require.Len(t, group.rules, expectedRules) - - start := time.Now() - - // Never expect more than 1 inflight query at a time. - go inflightTracker(ctx) - - group.Eval(ctx, start) - - require.EqualValues(t, 1, maxInflight.Load()) - // Each rule should take at least 1 second to execute sequentially. - require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) - // Each rule produces one vector. - require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) - } - - cancel() - }) - - t.Run("asynchronous evaluation with independent rules", func(t *testing.T) { // Reset. inflightQueries.Store(0) maxInflight.Store(0) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, 0)) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + ruleCount := 4 + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Never expect more than 1 inflight query at a time. + require.EqualValues(t, 1, maxInflight.Load()) + // Each rule should take at least 1 second to execute sequentially. + require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } + }) + + t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 4 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true @@ -1764,28 +1735,97 @@ func TestAsyncRuleEvaluation(t *testing.T) { opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) require.Empty(t, errs) require.Len(t, groups, 1) for _, group := range groups { - require.Len(t, group.rules, expectedRules) + require.Len(t, group.rules, ruleCount) start := time.Now() - - go inflightTracker(ctx) - group.Eval(ctx, start) // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. - require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. - require.EqualValues(t, expectedRules, testutil.ToFloat64(group.metrics.GroupSamples)) + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) } + }) - cancel() + t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 6 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + group.Eval(ctx, start) + + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + + } + }) + + t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { + // Reset. + inflightQueries.Store(0) + maxInflight.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ruleCount := 6 + opts := optsFactory(storage, &maxInflight, &inflightQueries, 0) + + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = int64(ruleCount) * 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) + require.Empty(t, errs) + require.Len(t, groups, 1) + + for _, group := range groups { + require.Len(t, group.rules, ruleCount) + + start := time.Now() + + group.Eval(ctx, start) + + // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. + require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + // Some rules should execute concurrently so should complete quicker. + require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) + // Each rule produces one vector. + require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples)) + } }) } @@ -1793,8 +1833,6 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) - const artificialDelay = time.Millisecond * 100 - var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 @@ -1803,50 +1841,15 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { ) files := []string{"fixtures/rules_multiple_groups.yaml"} - ruleManager := NewManager(&ManagerOptions{ - Context: context.Background(), - Logger: log.NewNopLogger(), - Appendable: storage, - ConcurrentEvalsEnabled: true, - MaxConcurrentEvals: maxConcurrency, - QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { - inflightQueries.Add(1) - defer func() { - inflightQueries.Add(-1) - }() - // Artificially delay all query executions to highlight concurrent execution improvement. - time.Sleep(artificialDelay) - - // Return a stub sample. - return promql.Vector{ - promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, - }, nil - }, - }) + ruleManager := NewManager(optsFactory(storage, &maxInflight, &inflightQueries, maxConcurrency)) groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) require.Empty(t, errs) require.Len(t, groups, groupCount) ctx, cancel := context.WithCancel(context.Background()) - - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + t.Cleanup(cancel) // Evaluate groups concurrently (like they normally do). var wg sync.WaitGroup @@ -1861,8 +1864,46 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) { } wg.Wait() - cancel() // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations. require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount)) } + +const artificialDelay = 10 * time.Millisecond + +func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions { + var inflightMu sync.Mutex + + concurrent := maxConcurrent > 0 + + return &ManagerOptions{ + Context: context.Background(), + Logger: log.NewNopLogger(), + ConcurrentEvalsEnabled: concurrent, + MaxConcurrentEvals: maxConcurrent, + Appendable: storage, + QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { + inflightMu.Lock() + + current := inflightQueries.Add(1) + defer func() { + inflightQueries.Add(-1) + }() + + highWatermark := maxInflight.Load() + + if current > highWatermark { + maxInflight.Store(current) + } + inflightMu.Unlock() + + // Artificially delay all query executions to highlight concurrent execution improvement. + time.Sleep(artificialDelay) + + // Return a stub sample. + return promql.Vector{ + promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, + }, nil + }, + } +} From ac1c6eb3ef131922a9211aa25a50f7af1df197d2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:08:07 +0100 Subject: [PATCH 09/17] Fix typo in CLI flag description Signed-off-by: Marco Pracucci --- cmd/prometheus/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 830ae46490..e36665857b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -416,7 +416,7 @@ func main() { serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").SetValue(&cfg.resendDelay) - serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules which can run concurrently."). + serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently."). Default("4").Int64Var(&cfg.maxConcurrentEvals) a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). From 6bbb03bd00156e4313067c622c097e83eb70eab9 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 20:02:18 +0100 Subject: [PATCH 10/17] Fixed auto-generated doc Signed-off-by: Marco Pracucci --- docs/command-line/prometheus.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index fef8ffa54d..2faea5b15e 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -48,7 +48,7 @@ The Prometheus monitoring server | --rules.alert.for-outage-tolerance | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | --rules.alert.for-grace-period | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | -| --rules.max-concurrent-evals | Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` | +| --rules.max-concurrent-evals | Global concurrency limit for independent rules that can run concurrently. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | From 1bb341fa517392d504285ed70d428483fdfb76f1 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:09:29 +0100 Subject: [PATCH 11/17] Improve doc Signed-off-by: Marco Pracucci --- docs/feature_flags.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 95b6270104..c3540cc234 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -221,5 +221,6 @@ By default, rule groups execute concurrently, but the rules within a group execu output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no reason to run them sequentially. When the `concurrent-rule-eval` feature flag is enabled, rules without any dependency on other rules within a rule group will be evaluated concurrently. -This can improve rule reliability at the expense of adding more concurrent query load. The number of concurrent rule evaluations can be configured -with `--rules.max-concurrent-rule-evals` which is set to `4` by default. +This has the potential to improve rule group evaluation latency and resource utilization at the expense of adding more concurrent query load. + +The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals`, which is set to `4` by default. From 21a03dc018cbfc90b690cbf21e945db029b79f00 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:12:40 +0100 Subject: [PATCH 12/17] Simplify the design to update concurrency controller once the rule evaluation has done Signed-off-by: Marco Pracucci --- rules/group.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/rules/group.go b/rules/group.go index 939d2cd5b6..b50189fa97 100644 --- a/rules/group.go +++ b/rules/group.go @@ -437,13 +437,10 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { default: } - eval := func(i int, rule Rule, async bool) { - defer func() { - if async { - wg.Done() - g.opts.RuleConcurrencyController.Done() - } - }() + eval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) ctx, sp := otel.Tracer("").Start(ctx, "rule") @@ -576,9 +573,13 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { ctrl := g.opts.RuleConcurrencyController if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { wg.Add(1) - go eval(i, rule, true) + + go eval(i, rule, func() { + wg.Done() + g.opts.RuleConcurrencyController.Done() + }) } else { - eval(i, rule, false) + eval(i, rule, nil) } } From 52bc568d04da46e448fca62a075f5d92c5f1e47b Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:33:24 +0100 Subject: [PATCH 13/17] Add more test cases to TestDependenciesEdgeCases Signed-off-by: Marco Pracucci --- rules/manager_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/rules/manager_test.go b/rules/manager_test.go index 2d1dc6b42e..4762ef6f5d 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1523,6 +1523,72 @@ func TestDependenciesEdgeCases(t *testing.T) { require.True(t, depMap.isIndependent(rule1)) require.True(t, depMap.isIndependent(rule2)) }) + + t.Run("rule with regexp matcher on metric name", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum({__name__=~".+"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with regexp matcher on metric name causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule with not equal matcher on metric name", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum({__name__!="requests", service="app"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with not equal matcher on metric name causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule with not regexp matcher on metric name", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum({__name__!~"requests.+", service="app"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule with not regexp matcher on metric name causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) } func TestNoMetricSelector(t *testing.T) { From 2764c4653112e59c10a8e0af8b18ebbb6f5851a0 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:38:11 +0100 Subject: [PATCH 14/17] Added more test cases to TestDependenciesEdgeCases Signed-off-by: Marco Pracucci --- rules/manager_test.go | 44 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/rules/manager_test.go b/rules/manager_test.go index 4762ef6f5d..ed6fea2532 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1589,6 +1589,50 @@ func TestDependenciesEdgeCases(t *testing.T) { require.False(t, depMap.isIndependent(rule1)) require.False(t, depMap.isIndependent(rule2)) }) + + t.Run("rule querying ALERTS metric", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum(ALERTS{alertname="test"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule querying ALERTS metric causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) + + t.Run("rule querying ALERTS_FOR_STATE metric", func(t *testing.T) { + expr, err := parser.ParseExpr("sum(requests)") + require.NoError(t, err) + rule1 := NewRecordingRule("first", expr, labels.Labels{}) + + expr, err = parser.ParseExpr(`sum(ALERTS_FOR_STATE{alertname="test"})`) + require.NoError(t, err) + rule2 := NewRecordingRule("second", expr, labels.Labels{}) + + group := NewGroup(GroupOptions{ + Name: "rule_group", + Interval: time.Second, + Rules: []Rule{rule1, rule2}, + Opts: opts, + }) + + depMap := buildDependencyMap(group.rules) + // A rule querying ALERTS_FOR_STATE metric causes the whole group to be indeterminate. + require.False(t, depMap.isIndependent(rule1)) + require.False(t, depMap.isIndependent(rule2)) + }) } func TestNoMetricSelector(t *testing.T) { From 23f89c18b241deeb2d2f6ea24f872e8442619641 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:39:50 +0100 Subject: [PATCH 15/17] Improved RuleConcurrencyController interface doc Signed-off-by: Marco Pracucci --- rules/manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 84b43fba7d..7da4eb88b1 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -416,11 +416,11 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount -// of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load. -// Concurrency is controlled globally, not on a per-group basis. +// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose is to bound the amount +// of concurrency in rule evaluations to avoid overwhelming the Prometheus server with additional query load and ensure +// the correctness of rules running concurrently. Concurrency is controlled globally, not on a per-group basis. type RuleConcurrencyController interface { - // RuleEligible determines if a rule can be run concurrently. + // RuleEligible determines if the rule can guarantee correct results while running concurrently. RuleEligible(g *Group, r Rule) bool // Allow determines whether any concurrent evaluation slots are available. From 046cd7599f573ff0f342edbafeee8f539a8077a2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 26 Jan 2024 19:53:44 +0100 Subject: [PATCH 16/17] Introduced sequentialRuleEvalController Signed-off-by: Marco Pracucci --- rules/group.go | 42 ++++++++++++++++++++++--------------- rules/manager.go | 48 ++++++++++++++++++++++++------------------- rules/manager_test.go | 3 ++- 3 files changed, 54 insertions(+), 39 deletions(-) diff --git a/rules/group.go b/rules/group.go index b50189fa97..56648a60cc 100644 --- a/rules/group.go +++ b/rules/group.go @@ -71,6 +71,9 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc + + // concurrencyController controls the rules evaluation concurrency. + concurrencyController RuleConcurrencyController } // GroupEvalIterationFunc is used to implement and extend rule group @@ -114,21 +117,27 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } + concurrencyController := o.Opts.RuleConcurrencyController + if concurrencyController == nil { + concurrencyController = sequentialRuleEvalController{} + } + return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - evalIterationFunc: evalIterationFunc, + name: o.Name, + file: o.File, + interval: o.Interval, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + evalIterationFunc: evalIterationFunc, + concurrencyController: concurrencyController, } } @@ -570,13 +579,12 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - ctrl := g.opts.RuleConcurrencyController - if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { + if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() { wg.Add(1) go eval(i, rule, func() { wg.Done() - g.opts.RuleConcurrencyController.Done() + ctrl.Done() }) } else { eval(i, rule, nil) diff --git a/rules/manager.go b/rules/manager.go index 7da4eb88b1..477508dc04 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -135,7 +135,11 @@ func NewManager(o *ManagerOptions) *Manager { } if o.RuleConcurrencyController == nil { - o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + if o.ConcurrentEvalsEnabled { + o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) + } else { + o.RuleConcurrencyController = sequentialRuleEvalController{} + } } m := &Manager{ @@ -184,9 +188,7 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() - if m.opts.RuleConcurrencyController != nil { - m.opts.RuleConcurrencyController.Invalidate() - } + m.opts.RuleConcurrencyController.Invalidate() groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) @@ -436,22 +438,20 @@ type RuleConcurrencyController interface { Invalidate() } -func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController { - return &concurrentRuleEvalController{ - enabled: enabled, - sema: semaphore.NewWeighted(maxConcurrency), - depMaps: map[*Group]dependencyMap{}, - } -} - // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { - enabled bool sema *semaphore.Weighted depMapsMu sync.Mutex depMaps map[*Group]dependencyMap } +func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } +} + func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { c.depMapsMu.Lock() defer c.depMapsMu.Unlock() @@ -466,18 +466,10 @@ func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { } func (c *concurrentRuleEvalController) Allow() bool { - if !c.enabled { - return false - } - return c.sema.TryAcquire(1) } func (c *concurrentRuleEvalController) Done() { - if !c.enabled { - return - } - c.sema.Release(1) } @@ -488,3 +480,17 @@ func (c *concurrentRuleEvalController) Invalidate() { // Clear out the memoized dependency maps because some or all groups may have been updated. c.depMaps = map[*Group]dependencyMap{} } + +// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. +type sequentialRuleEvalController struct{} + +func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool { + return false +} + +func (c sequentialRuleEvalController) Allow() bool { + return false +} + +func (c sequentialRuleEvalController) Done() {} +func (c sequentialRuleEvalController) Invalidate() {} diff --git a/rules/manager_test.go b/rules/manager_test.go index ed6fea2532..7d5a2bd9fe 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -676,7 +676,8 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - Appendable: st, + Appendable: st, + RuleConcurrencyController: sequentialRuleEvalController{}, }, } newGroup.CopyState(oldGroup) From cbbbd6e70ab7c60da020ce118830c014c909be1d Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 29 Jan 2024 10:21:57 +0100 Subject: [PATCH 17/17] Remove superfluous nil check in Group.metrics Signed-off-by: Marco Pracucci --- rules/group.go | 5 ++--- rules/manager_test.go | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rules/group.go b/rules/group.go index 56648a60cc..5ee06dc0ba 100644 --- a/rules/group.go +++ b/rules/group.go @@ -592,9 +592,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } wg.Wait() - if g.metrics != nil { - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) - } + + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) g.cleanupStaleSeries(ctx, ts) } diff --git a/rules/manager_test.go b/rules/manager_test.go index 7d5a2bd9fe..07ec06104d 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -679,6 +679,7 @@ func TestDeletedRuleMarkedStale(t *testing.T) { Appendable: st, RuleConcurrencyController: sequentialRuleEvalController{}, }, + metrics: NewGroupMetrics(nil), } newGroup.CopyState(oldGroup)