mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-26 05:51:01 +01:00 
			
		
		
		
	Saw some panic on main due to lack of defaulting: https://github.com/prometheus/prometheus/actions/runs/17317373582/job/49162760911 Signed-off-by: bwplotka <bwplotka@gmail.com>
		
			
				
	
	
		
			624 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			624 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2013 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package rules
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"log/slog"
 | |
| 	maps0 "maps"
 | |
| 	"net/url"
 | |
| 	"path/filepath"
 | |
| 	"slices"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"github.com/prometheus/common/model"
 | |
| 	"github.com/prometheus/common/promslog"
 | |
| 	"golang.org/x/sync/semaphore"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/model/labels"
 | |
| 	"github.com/prometheus/prometheus/model/rulefmt"
 | |
| 	"github.com/prometheus/prometheus/notifier"
 | |
| 	"github.com/prometheus/prometheus/promql"
 | |
| 	"github.com/prometheus/prometheus/promql/parser"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| 	"github.com/prometheus/prometheus/util/strutil"
 | |
| )
 | |
| 
 | |
| // QueryFunc processes PromQL queries.
 | |
| type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error)
 | |
| 
 | |
| // EngineQueryFunc returns a new query function that executes instant queries against
 | |
| // the given engine.
 | |
| // It converts scalar into vector results.
 | |
| func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable) QueryFunc {
 | |
| 	return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
 | |
| 		q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		res := q.Exec(ctx)
 | |
| 		if res.Err != nil {
 | |
| 			return nil, res.Err
 | |
| 		}
 | |
| 		switch v := res.Value.(type) {
 | |
| 		case promql.Vector:
 | |
| 			return v, nil
 | |
| 		case promql.Scalar:
 | |
| 			return promql.Vector{promql.Sample{
 | |
| 				T:      v.T,
 | |
| 				F:      v.V,
 | |
| 				Metric: labels.Labels{},
 | |
| 			}}, nil
 | |
| 		default:
 | |
| 			return nil, errors.New("rule result is not a vector or scalar")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // DefaultEvalIterationFunc is the default implementation of
 | |
| // GroupEvalIterationFunc that is periodically invoked to evaluate the rules
 | |
| // in a group at a given point in time and updates Group state and metrics
 | |
| // accordingly. Custom GroupEvalIterationFunc implementations are recommended
 | |
| // to invoke this function as well, to ensure correct Group state and metrics
 | |
| // are maintained.
 | |
| func DefaultEvalIterationFunc(ctx context.Context, g *Group, evalTimestamp time.Time) {
 | |
| 	g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
 | |
| 
 | |
| 	start := time.Now()
 | |
| 	g.Eval(ctx, evalTimestamp)
 | |
| 	timeSinceStart := time.Since(start)
 | |
| 
 | |
| 	g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
 | |
| 	g.updateRuleEvaluationTimeSum()
 | |
| 	g.setEvaluationTime(timeSinceStart)
 | |
| 	g.setLastEvaluation(start)
 | |
| 	g.setLastEvalTimestamp(evalTimestamp)
 | |
| }
 | |
| 
 | |
| // The Manager manages recording and alerting rules.
 | |
| type Manager struct {
 | |
| 	opts                 *ManagerOptions
 | |
| 	groups               map[string]*Group
 | |
| 	mtx                  sync.RWMutex
 | |
| 	block                chan struct{}
 | |
| 	done                 chan struct{}
 | |
| 	restored             bool
 | |
| 	restoreNewRuleGroups bool
 | |
| 
 | |
| 	logger *slog.Logger
 | |
| }
 | |
| 
 | |
| // NotifyFunc sends notifications about a set of alerts generated by the given expression.
 | |
| type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
 | |
| 
 | |
| // ManagerOptions bundles options for the Manager.
 | |
| type ManagerOptions struct {
 | |
| 	NameValidationScheme      model.ValidationScheme
 | |
| 	ExternalURL               *url.URL
 | |
| 	QueryFunc                 QueryFunc
 | |
| 	NotifyFunc                NotifyFunc
 | |
| 	Context                   context.Context
 | |
| 	Appendable                storage.Appendable
 | |
| 	Queryable                 storage.Queryable
 | |
| 	Logger                    *slog.Logger
 | |
| 	Registerer                prometheus.Registerer
 | |
| 	OutageTolerance           time.Duration
 | |
| 	ForGracePeriod            time.Duration
 | |
| 	ResendDelay               time.Duration
 | |
| 	GroupLoader               GroupLoader
 | |
| 	DefaultRuleQueryOffset    func() time.Duration
 | |
| 	MaxConcurrentEvals        int64
 | |
| 	ConcurrentEvalsEnabled    bool
 | |
| 	RuleConcurrencyController RuleConcurrencyController
 | |
| 	RuleDependencyController  RuleDependencyController
 | |
| 	// At present, manager only restores `for` state when manager is newly created which happens
 | |
| 	// during restarts. This flag provides an option to restore the `for` state when new rule groups are
 | |
| 	// added to an existing manager
 | |
| 	RestoreNewRuleGroups bool
 | |
| 
 | |
| 	Metrics *Metrics
 | |
| }
 | |
| 
 | |
| // NewManager returns an implementation of Manager, ready to be started
 | |
| // by calling the Run method.
 | |
| func NewManager(o *ManagerOptions) *Manager {
 | |
| 	switch o.NameValidationScheme {
 | |
| 	case model.UTF8Validation, model.LegacyValidation:
 | |
| 	case model.UnsetValidation:
 | |
| 		o.NameValidationScheme = model.UTF8Validation
 | |
| 	default:
 | |
| 		panic(fmt.Errorf("unrecognized name validation scheme: %s", o.NameValidationScheme))
 | |
| 	}
 | |
| 	if o.Context == nil {
 | |
| 		o.Context = context.Background()
 | |
| 	}
 | |
| 
 | |
| 	if o.Metrics == nil {
 | |
| 		o.Metrics = NewGroupMetrics(o.Registerer)
 | |
| 	}
 | |
| 
 | |
| 	if o.GroupLoader == nil {
 | |
| 		o.GroupLoader = FileLoader{}
 | |
| 	}
 | |
| 
 | |
| 	if o.RuleConcurrencyController == nil {
 | |
| 		if o.ConcurrentEvalsEnabled {
 | |
| 			o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals)
 | |
| 		} else {
 | |
| 			o.RuleConcurrencyController = sequentialRuleEvalController{}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if o.RuleDependencyController == nil {
 | |
| 		o.RuleDependencyController = ruleDependencyController{}
 | |
| 	}
 | |
| 
 | |
| 	if o.Logger == nil {
 | |
| 		o.Logger = promslog.NewNopLogger()
 | |
| 	}
 | |
| 
 | |
| 	m := &Manager{
 | |
| 		groups:               map[string]*Group{},
 | |
| 		opts:                 o,
 | |
| 		block:                make(chan struct{}),
 | |
| 		done:                 make(chan struct{}),
 | |
| 		logger:               o.Logger,
 | |
| 		restoreNewRuleGroups: o.RestoreNewRuleGroups,
 | |
| 	}
 | |
| 
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // Run starts processing of the rule manager. It is blocking.
 | |
| func (m *Manager) Run() {
 | |
| 	m.logger.Info("Starting rule manager...")
 | |
| 	m.start()
 | |
| 	<-m.done
 | |
| }
 | |
| 
 | |
| func (m *Manager) start() {
 | |
| 	close(m.block)
 | |
| }
 | |
| 
 | |
| // Stop the rule manager's rule evaluation cycles.
 | |
| func (m *Manager) Stop() {
 | |
| 	m.mtx.Lock()
 | |
| 	defer m.mtx.Unlock()
 | |
| 
 | |
| 	m.logger.Info("Stopping rule manager...")
 | |
| 
 | |
| 	// Stop all groups asynchronously, then wait for them to finish.
 | |
| 	// This is faster than stopping and waiting for each group in sequence.
 | |
| 	for _, eg := range m.groups {
 | |
| 		eg.stopAsync()
 | |
| 	}
 | |
| 
 | |
| 	for _, eg := range m.groups {
 | |
| 		eg.waitStopped()
 | |
| 	}
 | |
| 
 | |
| 	// Shut down the groups waiting multiple evaluation intervals to write
 | |
| 	// staleness markers.
 | |
| 	close(m.done)
 | |
| 
 | |
| 	m.logger.Info("Rule manager stopped")
 | |
| }
 | |
| 
 | |
| // Update the rule manager's state as the config requires. If
 | |
| // loading the new rules failed the old rule set is restored.
 | |
| // This method will no-op in case the manager is already stopped.
 | |
| func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc) error {
 | |
| 	m.mtx.Lock()
 | |
| 	defer m.mtx.Unlock()
 | |
| 
 | |
| 	// We cannot update a stopped manager
 | |
| 	select {
 | |
| 	case <-m.done:
 | |
| 		return nil
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, false, files...)
 | |
| 
 | |
| 	if errs != nil {
 | |
| 		for _, e := range errs {
 | |
| 			m.logger.Error("loading groups failed", "err", e)
 | |
| 		}
 | |
| 		return errors.New("error loading rules, previous rule set restored")
 | |
| 	}
 | |
| 	m.restored = true
 | |
| 
 | |
| 	var wg sync.WaitGroup
 | |
| 	for _, newg := range groups {
 | |
| 		// If there is an old group with the same identifier,
 | |
| 		// check if new group equals with the old group, if yes then skip it.
 | |
| 		// If not equals, stop it and wait for it to finish the current iteration.
 | |
| 		// Then copy it into the new group.
 | |
| 		gn := GroupKey(newg.file, newg.name)
 | |
| 		oldg, ok := m.groups[gn]
 | |
| 		delete(m.groups, gn)
 | |
| 
 | |
| 		if ok && oldg.Equals(newg) {
 | |
| 			groups[gn] = oldg
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		wg.Add(1)
 | |
| 		go func(newg *Group) {
 | |
| 			if ok {
 | |
| 				oldg.stop()
 | |
| 				newg.CopyState(oldg)
 | |
| 			}
 | |
| 			wg.Done()
 | |
| 			// Wait with starting evaluation until the rule manager
 | |
| 			// is told to run. This is necessary to avoid running
 | |
| 			// queries against a bootstrapping storage.
 | |
| 			<-m.block
 | |
| 			newg.run(m.opts.Context)
 | |
| 		}(newg)
 | |
| 	}
 | |
| 
 | |
| 	// Stop remaining old groups.
 | |
| 	wg.Add(len(m.groups))
 | |
| 	for n, oldg := range m.groups {
 | |
| 		go func(n string, g *Group) {
 | |
| 			g.markStale = true
 | |
| 			g.stop()
 | |
| 			if m := g.metrics; m != nil {
 | |
| 				m.IterationsMissed.DeleteLabelValues(n)
 | |
| 				m.IterationsScheduled.DeleteLabelValues(n)
 | |
| 				m.EvalTotal.DeleteLabelValues(n)
 | |
| 				m.EvalFailures.DeleteLabelValues(n)
 | |
| 				m.GroupInterval.DeleteLabelValues(n)
 | |
| 				m.GroupLastEvalTime.DeleteLabelValues(n)
 | |
| 				m.GroupLastDuration.DeleteLabelValues(n)
 | |
| 				m.GroupRules.DeleteLabelValues(n)
 | |
| 				m.GroupSamples.DeleteLabelValues((n))
 | |
| 			}
 | |
| 			wg.Done()
 | |
| 		}(n, oldg)
 | |
| 	}
 | |
| 
 | |
| 	wg.Wait()
 | |
| 	m.groups = groups
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them.
 | |
| type GroupLoader interface {
 | |
| 	Load(identifier string, ignoreUnknownFields bool, nameValidationScheme model.ValidationScheme) (*rulefmt.RuleGroups, []error)
 | |
| 	Parse(query string) (parser.Expr, error)
 | |
| }
 | |
| 
 | |
| // FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile
 | |
| // and parser.ParseExpr.
 | |
| type FileLoader struct{}
 | |
| 
 | |
| func (FileLoader) Load(identifier string, ignoreUnknownFields bool, nameValidationScheme model.ValidationScheme) (*rulefmt.RuleGroups, []error) {
 | |
| 	return rulefmt.ParseFile(identifier, ignoreUnknownFields, nameValidationScheme)
 | |
| }
 | |
| 
 | |
| func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.ParseExpr(query) }
 | |
| 
 | |
| // LoadGroups reads groups from a list of files.
 | |
| func (m *Manager) LoadGroups(
 | |
| 	interval time.Duration, externalLabels labels.Labels, externalURL string, groupEvalIterationFunc GroupEvalIterationFunc, ignoreUnknownFields bool, filenames ...string,
 | |
| ) (map[string]*Group, []error) {
 | |
| 	groups := make(map[string]*Group)
 | |
| 
 | |
| 	shouldRestore := !m.restored || m.restoreNewRuleGroups
 | |
| 
 | |
| 	for _, fn := range filenames {
 | |
| 		rgs, errs := m.opts.GroupLoader.Load(fn, ignoreUnknownFields, m.opts.NameValidationScheme)
 | |
| 		if errs != nil {
 | |
| 			return nil, errs
 | |
| 		}
 | |
| 
 | |
| 		for _, rg := range rgs.Groups {
 | |
| 			itv := interval
 | |
| 			if rg.Interval != 0 {
 | |
| 				itv = time.Duration(rg.Interval)
 | |
| 			}
 | |
| 
 | |
| 			rules := make([]Rule, 0, len(rg.Rules))
 | |
| 			for _, r := range rg.Rules {
 | |
| 				expr, err := m.opts.GroupLoader.Parse(r.Expr)
 | |
| 				if err != nil {
 | |
| 					return nil, []error{fmt.Errorf("%s: %w", fn, err)}
 | |
| 				}
 | |
| 
 | |
| 				mLabels := FromMaps(rg.Labels, r.Labels)
 | |
| 
 | |
| 				if r.Alert != "" {
 | |
| 					rules = append(rules, NewAlertingRule(
 | |
| 						r.Alert,
 | |
| 						expr,
 | |
| 						time.Duration(r.For),
 | |
| 						time.Duration(r.KeepFiringFor),
 | |
| 						mLabels,
 | |
| 						labels.FromMap(r.Annotations),
 | |
| 						externalLabels,
 | |
| 						externalURL,
 | |
| 						!shouldRestore,
 | |
| 						m.logger.With("alert", r.Alert),
 | |
| 					))
 | |
| 					continue
 | |
| 				}
 | |
| 				rules = append(rules, NewRecordingRule(
 | |
| 					r.Record,
 | |
| 					expr,
 | |
| 					mLabels,
 | |
| 				))
 | |
| 			}
 | |
| 
 | |
| 			// Check dependencies between rules and store it on the Rule itself.
 | |
| 			m.opts.RuleDependencyController.AnalyseRules(rules)
 | |
| 
 | |
| 			groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
 | |
| 				Name:              rg.Name,
 | |
| 				File:              fn,
 | |
| 				Interval:          itv,
 | |
| 				Limit:             rg.Limit,
 | |
| 				Rules:             rules,
 | |
| 				ShouldRestore:     shouldRestore,
 | |
| 				Opts:              m.opts,
 | |
| 				QueryOffset:       (*time.Duration)(rg.QueryOffset),
 | |
| 				done:              m.done,
 | |
| 				EvalIterationFunc: groupEvalIterationFunc,
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return groups, nil
 | |
| }
 | |
| 
 | |
| // RuleGroups returns the list of manager's rule groups.
 | |
| func (m *Manager) RuleGroups() []*Group {
 | |
| 	m.mtx.RLock()
 | |
| 	defer m.mtx.RUnlock()
 | |
| 
 | |
| 	rgs := make([]*Group, 0, len(m.groups))
 | |
| 	for _, g := range m.groups {
 | |
| 		rgs = append(rgs, g)
 | |
| 	}
 | |
| 
 | |
| 	slices.SortFunc(rgs, func(a, b *Group) int {
 | |
| 		fileCompare := strings.Compare(a.file, b.file)
 | |
| 
 | |
| 		// If its 0, then the file names are the same.
 | |
| 		// Lets look at the group names in that case.
 | |
| 		if fileCompare != 0 {
 | |
| 			return fileCompare
 | |
| 		}
 | |
| 		return strings.Compare(a.name, b.name)
 | |
| 	})
 | |
| 
 | |
| 	return rgs
 | |
| }
 | |
| 
 | |
| // Rules returns the list of the manager's rules.
 | |
| func (m *Manager) Rules(matcherSets ...[]*labels.Matcher) []Rule {
 | |
| 	m.mtx.RLock()
 | |
| 	defer m.mtx.RUnlock()
 | |
| 
 | |
| 	var rules []Rule
 | |
| 	for _, g := range m.groups {
 | |
| 		rules = append(rules, g.Rules(matcherSets...)...)
 | |
| 	}
 | |
| 
 | |
| 	return rules
 | |
| }
 | |
| 
 | |
| // AlertingRules returns the list of the manager's alerting rules.
 | |
| func (m *Manager) AlertingRules() []*AlertingRule {
 | |
| 	alerts := []*AlertingRule{}
 | |
| 	for _, rule := range m.Rules() {
 | |
| 		if alertingRule, ok := rule.(*AlertingRule); ok {
 | |
| 			alerts = append(alerts, alertingRule)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return alerts
 | |
| }
 | |
| 
 | |
| type Sender interface {
 | |
| 	Send(alerts ...*notifier.Alert)
 | |
| }
 | |
| 
 | |
| // SendAlerts implements the rules.NotifyFunc for a Notifier.
 | |
| func SendAlerts(s Sender, externalURL string) NotifyFunc {
 | |
| 	return func(_ context.Context, expr string, alerts ...*Alert) {
 | |
| 		var res []*notifier.Alert
 | |
| 
 | |
| 		for _, alert := range alerts {
 | |
| 			a := ¬ifier.Alert{
 | |
| 				StartsAt:     alert.FiredAt,
 | |
| 				Labels:       alert.Labels,
 | |
| 				Annotations:  alert.Annotations,
 | |
| 				GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
 | |
| 			}
 | |
| 			if !alert.ResolvedAt.IsZero() {
 | |
| 				a.EndsAt = alert.ResolvedAt
 | |
| 			} else {
 | |
| 				a.EndsAt = alert.ValidUntil
 | |
| 			}
 | |
| 			res = append(res, a)
 | |
| 		}
 | |
| 
 | |
| 		if len(alerts) > 0 {
 | |
| 			s.Send(res...)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RuleDependencyController controls whether a set of rules have dependencies between each other.
 | |
| type RuleDependencyController interface {
 | |
| 	// AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed
 | |
| 	// not having any dependants and/or dependency, this function should call Rule.SetDependentRules(...)
 | |
| 	// and/or Rule.SetDependencyRules(...).
 | |
| 	AnalyseRules(rules []Rule)
 | |
| }
 | |
| 
 | |
| type ruleDependencyController struct{}
 | |
| 
 | |
| // AnalyseRules implements RuleDependencyController.
 | |
| func (ruleDependencyController) AnalyseRules(rules []Rule) {
 | |
| 	depMap := buildDependencyMap(rules)
 | |
| 
 | |
| 	if depMap == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for _, r := range rules {
 | |
| 		r.SetDependentRules(depMap.dependents(r))
 | |
| 		r.SetDependencyRules(depMap.dependencies(r))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ConcurrentRules represents a slice of indexes of rules that can be evaluated concurrently.
 | |
| type ConcurrentRules []int
 | |
| 
 | |
| // RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
 | |
| // Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
 | |
| // server with additional query load. Concurrency is controlled globally, not on a per-group basis.
 | |
| type RuleConcurrencyController interface {
 | |
| 	// SplitGroupIntoBatches returns an ordered slice of ConcurrentRules, which are batches of rules that can be evaluated concurrently.
 | |
| 	// The rules are represented by their index from the input rule group.
 | |
| 	SplitGroupIntoBatches(ctx context.Context, group *Group) []ConcurrentRules
 | |
| 
 | |
| 	// Allow determines if the given rule is allowed to be evaluated concurrently.
 | |
| 	// If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done.
 | |
| 	// It is important that both *Group and Rule are not retained and only be used for the duration of the call.
 | |
| 	Allow(ctx context.Context, group *Group, rule Rule) bool
 | |
| 
 | |
| 	// Done releases a concurrent evaluation slot.
 | |
| 	Done(ctx context.Context)
 | |
| }
 | |
| 
 | |
| // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
 | |
| type concurrentRuleEvalController struct {
 | |
| 	sema *semaphore.Weighted
 | |
| }
 | |
| 
 | |
| func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController {
 | |
| 	return &concurrentRuleEvalController{
 | |
| 		sema: semaphore.NewWeighted(maxConcurrency),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *concurrentRuleEvalController) Allow(context.Context, *Group, Rule) bool {
 | |
| 	return c.sema.TryAcquire(1)
 | |
| }
 | |
| 
 | |
| func (*concurrentRuleEvalController) SplitGroupIntoBatches(_ context.Context, g *Group) []ConcurrentRules {
 | |
| 	// Using the rule dependency controller information (rules being identified as having no dependencies or no dependants),
 | |
| 	// we can safely run the following concurrent groups:
 | |
| 	// 1. Concurrently, all rules that have no dependencies
 | |
| 	// 2. Sequentially, all rules that have both dependencies and dependants
 | |
| 	// 3. Concurrently, all rules that have no dependants
 | |
| 
 | |
| 	var noDependencies []int
 | |
| 	var dependenciesAndDependants []int
 | |
| 	var noDependants []int
 | |
| 
 | |
| 	for i, r := range g.rules {
 | |
| 		switch {
 | |
| 		case r.NoDependencyRules():
 | |
| 			noDependencies = append(noDependencies, i)
 | |
| 		case !r.NoDependentRules() && !r.NoDependencyRules():
 | |
| 			dependenciesAndDependants = append(dependenciesAndDependants, i)
 | |
| 		case r.NoDependentRules():
 | |
| 			noDependants = append(noDependants, i)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var order []ConcurrentRules
 | |
| 	if len(noDependencies) > 0 {
 | |
| 		order = append(order, noDependencies)
 | |
| 	}
 | |
| 	for _, r := range dependenciesAndDependants {
 | |
| 		order = append(order, []int{r})
 | |
| 	}
 | |
| 	if len(noDependants) > 0 {
 | |
| 		order = append(order, noDependants)
 | |
| 	}
 | |
| 
 | |
| 	return order
 | |
| }
 | |
| 
 | |
| func (c *concurrentRuleEvalController) Done(context.Context) {
 | |
| 	c.sema.Release(1)
 | |
| }
 | |
| 
 | |
| var _ RuleConcurrencyController = &sequentialRuleEvalController{}
 | |
| 
 | |
| // sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially.
 | |
| type sequentialRuleEvalController struct{}
 | |
| 
 | |
| func (sequentialRuleEvalController) Allow(context.Context, *Group, Rule) bool {
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (sequentialRuleEvalController) SplitGroupIntoBatches(context.Context, *Group) []ConcurrentRules {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (sequentialRuleEvalController) Done(context.Context) {}
 | |
| 
 | |
| // FromMaps returns new sorted Labels from the given maps, overriding each other in order.
 | |
| func FromMaps(maps ...map[string]string) labels.Labels {
 | |
| 	mLables := make(map[string]string)
 | |
| 
 | |
| 	for _, m := range maps {
 | |
| 		maps0.Copy(mLables, m)
 | |
| 	}
 | |
| 
 | |
| 	return labels.FromMap(mLables)
 | |
| }
 | |
| 
 | |
| // ParseFiles parses the rule files corresponding to glob patterns.
 | |
| func ParseFiles(patterns []string, nameValidationScheme model.ValidationScheme) error {
 | |
| 	files := map[string]string{}
 | |
| 	for _, pat := range patterns {
 | |
| 		fns, err := filepath.Glob(pat)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("failed retrieving rule files for %q: %w", pat, err)
 | |
| 		}
 | |
| 		for _, fn := range fns {
 | |
| 			absPath, err := filepath.Abs(fn)
 | |
| 			if err != nil {
 | |
| 				absPath = fn
 | |
| 			}
 | |
| 			cleanPath, err := filepath.EvalSymlinks(absPath)
 | |
| 			if err != nil {
 | |
| 				return fmt.Errorf("failed evaluating rule file path %q (pattern: %q): %w", absPath, pat, err)
 | |
| 			}
 | |
| 			files[cleanPath] = pat
 | |
| 		}
 | |
| 	}
 | |
| 	for fn, pat := range files {
 | |
| 		_, errs := rulefmt.ParseFile(fn, false, nameValidationScheme)
 | |
| 		if len(errs) > 0 {
 | |
| 			return fmt.Errorf("parse rules from file %q (pattern: %q): %w", fn, pat, errors.Join(errs...))
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |