diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index c0bd136fa8..fa953874ae 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -58,8 +58,6 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
- "github.com/prometheus/prometheus/discovery/legacymanager"
- "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@@ -159,7 +157,6 @@ type flagConfig struct {
// These options are extracted from featureList
// for ease of use.
enableExpandExternalLabels bool
- enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
enableAutoGOMEMLIMIT bool
@@ -197,9 +194,6 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "metadata-wal-records":
c.scrape.AppendMetadata = true
level.Info(logger).Log("msg", "Experimental metadata records in WAL enabled, required for remote write 2.0")
- case "new-service-discovery-manager":
- c.enableNewSDManager = true
- level.Info(logger).Log("msg", "Experimental service discovery manager")
case "promql-per-step-stats":
c.enablePerStepStats = true
level.Info(logger).Log("msg", "Experimental per-step statistics reporting")
@@ -463,7 +457,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
- a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
+ a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
@@ -651,8 +645,8 @@ func main() {
ctxScrape, cancelScrape = context.WithCancel(context.Background())
ctxNotify, cancelNotify = context.WithCancel(context.Background())
- discoveryManagerScrape discoveryManager
- discoveryManagerNotify discoveryManager
+ discoveryManagerScrape *discovery.Manager
+ discoveryManagerNotify *discovery.Manager
)
// Kubernetes client metrics are used by Kubernetes SD.
@@ -672,42 +666,16 @@ func main() {
os.Exit(1)
}
- if cfg.enableNewSDManager {
- {
- discMgr := discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
- if discMgr == nil {
- level.Error(logger).Log("msg", "failed to create a discovery manager scrape")
- os.Exit(1)
- }
- discoveryManagerScrape = discMgr
- }
+ discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
+ if discoveryManagerScrape == nil {
+ level.Error(logger).Log("msg", "failed to create a discovery manager scrape")
+ os.Exit(1)
+ }
- {
- discMgr := discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify"))
- if discMgr == nil {
- level.Error(logger).Log("msg", "failed to create a discovery manager notify")
- os.Exit(1)
- }
- discoveryManagerNotify = discMgr
- }
- } else {
- {
- discMgr := legacymanager.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, legacymanager.Name("scrape"))
- if discMgr == nil {
- level.Error(logger).Log("msg", "failed to create a discovery manager scrape")
- os.Exit(1)
- }
- discoveryManagerScrape = discMgr
- }
-
- {
- discMgr := legacymanager.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, legacymanager.Name("notify"))
- if discMgr == nil {
- level.Error(logger).Log("msg", "failed to create a discovery manager notify")
- os.Exit(1)
- }
- discoveryManagerNotify = discMgr
- }
+ discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify"))
+ if discoveryManagerNotify == nil {
+ level.Error(logger).Log("msg", "failed to create a discovery manager notify")
+ os.Exit(1)
}
scrapeManager, err := scrape.NewManager(
@@ -1765,15 +1733,6 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
}
}
-// discoveryManager interfaces the discovery manager. This is used to keep using
-// the manager that restarts SD's on reload for a few releases until we feel
-// the new manager can be enabled for all users.
-type discoveryManager interface {
- ApplyConfig(cfg map[string]discovery.Configs) error
- Run() error
- SyncCh() <-chan map[string][]*targetgroup.Group
-}
-
// rwProtoMsgFlagParser is a custom parser for config.RemoteWriteProtoMsg enum.
type rwProtoMsgFlagParser struct {
msgs *[]config.RemoteWriteProtoMsg
diff --git a/discovery/legacymanager/manager.go b/discovery/legacymanager/manager.go
deleted file mode 100644
index 6fc61485d1..0000000000
--- a/discovery/legacymanager/manager.go
+++ /dev/null
@@ -1,332 +0,0 @@
-// Copyright 2016 The Prometheus Authors
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package legacymanager
-
-import (
- "context"
- "fmt"
- "reflect"
- "sync"
- "time"
-
- "github.com/go-kit/log"
- "github.com/go-kit/log/level"
- "github.com/prometheus/client_golang/prometheus"
-
- "github.com/prometheus/prometheus/discovery"
- "github.com/prometheus/prometheus/discovery/targetgroup"
-)
-
-type poolKey struct {
- setName string
- provider string
-}
-
-// provider holds a Discoverer instance, its configuration and its subscribers.
-type provider struct {
- name string
- d discovery.Discoverer
- subs []string
- config interface{}
-}
-
-// NewManager is the Discovery Manager constructor.
-func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, sdMetrics map[string]discovery.DiscovererMetrics, options ...func(*Manager)) *Manager {
- if logger == nil {
- logger = log.NewNopLogger()
- }
- mgr := &Manager{
- logger: logger,
- syncCh: make(chan map[string][]*targetgroup.Group),
- targets: make(map[poolKey]map[string]*targetgroup.Group),
- discoverCancel: []context.CancelFunc{},
- ctx: ctx,
- updatert: 5 * time.Second,
- triggerSend: make(chan struct{}, 1),
- registerer: registerer,
- sdMetrics: sdMetrics,
- }
- for _, option := range options {
- option(mgr)
- }
-
- // Register the metrics.
- // We have to do this after setting all options, so that the name of the Manager is set.
- if metrics, err := discovery.NewManagerMetrics(registerer, mgr.name); err == nil {
- mgr.metrics = metrics
- } else {
- level.Error(logger).Log("msg", "Failed to create discovery manager metrics", "manager", mgr.name, "err", err)
- return nil
- }
-
- return mgr
-}
-
-// Name sets the name of the manager.
-func Name(n string) func(*Manager) {
- return func(m *Manager) {
- m.mtx.Lock()
- defer m.mtx.Unlock()
- m.name = n
- }
-}
-
-// Manager maintains a set of discovery providers and sends each update to a map channel.
-// Targets are grouped by the target set name.
-type Manager struct {
- logger log.Logger
- name string
- mtx sync.RWMutex
- ctx context.Context
- discoverCancel []context.CancelFunc
-
- // Some Discoverers(eg. k8s) send only the updates for a given target group
- // so we use map[tg.Source]*targetgroup.Group to know which group to update.
- targets map[poolKey]map[string]*targetgroup.Group
- // providers keeps track of SD providers.
- providers []*provider
- // The sync channel sends the updates as a map where the key is the job value from the scrape config.
- syncCh chan map[string][]*targetgroup.Group
-
- // How long to wait before sending updates to the channel. The variable
- // should only be modified in unit tests.
- updatert time.Duration
-
- // The triggerSend channel signals to the manager that new updates have been received from providers.
- triggerSend chan struct{}
-
- // A registerer for all service discovery metrics.
- registerer prometheus.Registerer
-
- metrics *discovery.Metrics
- sdMetrics map[string]discovery.DiscovererMetrics
-}
-
-// Run starts the background processing.
-func (m *Manager) Run() error {
- go m.sender()
- <-m.ctx.Done()
- m.cancelDiscoverers()
- return m.ctx.Err()
-}
-
-// SyncCh returns a read only channel used by all the clients to receive target updates.
-func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
- return m.syncCh
-}
-
-// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
-func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error {
- m.mtx.Lock()
- defer m.mtx.Unlock()
-
- for pk := range m.targets {
- if _, ok := cfg[pk.setName]; !ok {
- m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName)
- }
- }
- m.cancelDiscoverers()
- m.targets = make(map[poolKey]map[string]*targetgroup.Group)
- m.providers = nil
- m.discoverCancel = nil
-
- failedCount := 0
- for name, scfg := range cfg {
- failedCount += m.registerProviders(scfg, name)
- m.metrics.DiscoveredTargets.WithLabelValues(name).Set(0)
- }
- m.metrics.FailedConfigs.Set(float64(failedCount))
-
- for _, prov := range m.providers {
- m.startProvider(m.ctx, prov)
- }
-
- return nil
-}
-
-// StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
-func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) {
- p := &provider{
- name: name,
- d: worker,
- subs: []string{name},
- }
- m.providers = append(m.providers, p)
- m.startProvider(ctx, p)
-}
-
-func (m *Manager) startProvider(ctx context.Context, p *provider) {
- level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
- ctx, cancel := context.WithCancel(ctx)
- updates := make(chan []*targetgroup.Group)
-
- m.discoverCancel = append(m.discoverCancel, cancel)
-
- go p.d.Run(ctx, updates)
- go m.updater(ctx, p, updates)
-}
-
-func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
- for {
- select {
- case <-ctx.Done():
- return
- case tgs, ok := <-updates:
- m.metrics.ReceivedUpdates.Inc()
- if !ok {
- level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
- return
- }
-
- for _, s := range p.subs {
- m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
- }
-
- select {
- case m.triggerSend <- struct{}{}:
- default:
- }
- }
- }
-}
-
-func (m *Manager) sender() {
- ticker := time.NewTicker(m.updatert)
- defer ticker.Stop()
-
- for {
- select {
- case <-m.ctx.Done():
- return
- case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
- select {
- case <-m.triggerSend:
- m.metrics.SentUpdates.Inc()
- select {
- case m.syncCh <- m.allGroups():
- default:
- m.metrics.DelayedUpdates.Inc()
- level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
- select {
- case m.triggerSend <- struct{}{}:
- default:
- }
- }
- default:
- }
- }
- }
-}
-
-func (m *Manager) cancelDiscoverers() {
- for _, c := range m.discoverCancel {
- c()
- }
-}
-
-func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
- m.mtx.Lock()
- defer m.mtx.Unlock()
-
- if _, ok := m.targets[poolKey]; !ok {
- m.targets[poolKey] = make(map[string]*targetgroup.Group)
- }
- for _, tg := range tgs {
- if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
- m.targets[poolKey][tg.Source] = tg
- }
- }
-}
-
-func (m *Manager) allGroups() map[string][]*targetgroup.Group {
- m.mtx.RLock()
- defer m.mtx.RUnlock()
-
- tSets := map[string][]*targetgroup.Group{}
- n := map[string]int{}
- for pkey, tsets := range m.targets {
- for _, tg := range tsets {
- // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
- // to signal that it needs to stop all scrape loops for this target set.
- tSets[pkey.setName] = append(tSets[pkey.setName], tg)
- n[pkey.setName] += len(tg.Targets)
- }
- }
- for setName, v := range n {
- m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v))
- }
- return tSets
-}
-
-// registerProviders returns a number of failed SD config.
-func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int {
- var (
- failed int
- added bool
- )
- add := func(cfg discovery.Config) {
- for _, p := range m.providers {
- if reflect.DeepEqual(cfg, p.config) {
- p.subs = append(p.subs, setName)
- added = true
- return
- }
- }
- typ := cfg.Name()
- d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{
- Logger: log.With(m.logger, "discovery", typ, "config", setName),
- Metrics: m.sdMetrics[typ],
- })
- if err != nil {
- level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName)
- failed++
- return
- }
- m.providers = append(m.providers, &provider{
- name: fmt.Sprintf("%s/%d", typ, len(m.providers)),
- d: d,
- config: cfg,
- subs: []string{setName},
- })
- added = true
- }
- for _, cfg := range cfgs {
- add(cfg)
- }
- if !added {
- // Add an empty target group to force the refresh of the corresponding
- // scrape pool and to notify the receiver that this target set has no
- // current targets.
- // It can happen because the combined set of SD configurations is empty
- // or because we fail to instantiate all the SD configurations.
- add(discovery.StaticConfig{{}})
- }
- return failed
-}
-
-// StaticProvider holds a list of target groups that never change.
-type StaticProvider struct {
- TargetGroups []*targetgroup.Group
-}
-
-// Run implements the Worker interface.
-func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
- // We still have to consider that the consumer exits right away in which case
- // the context will be canceled.
- select {
- case ch <- sd.TargetGroups:
- case <-ctx.Done():
- }
- close(ch)
-}
diff --git a/discovery/legacymanager/manager_test.go b/discovery/legacymanager/manager_test.go
deleted file mode 100644
index f1be963113..0000000000
--- a/discovery/legacymanager/manager_test.go
+++ /dev/null
@@ -1,1185 +0,0 @@
-// Copyright 2016 The Prometheus Authors
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package legacymanager
-
-import (
- "context"
- "fmt"
- "sort"
- "strconv"
- "testing"
- "time"
-
- "github.com/go-kit/log"
- "github.com/prometheus/client_golang/prometheus"
- client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
- "github.com/prometheus/common/model"
- "github.com/stretchr/testify/require"
-
- "github.com/prometheus/prometheus/discovery"
- "github.com/prometheus/prometheus/discovery/targetgroup"
- "github.com/prometheus/prometheus/util/testutil"
-)
-
-func TestMain(m *testing.M) {
- testutil.TolerantVerifyLeak(m)
-}
-
-func newTestMetrics(t *testing.T, reg prometheus.Registerer) (*discovery.RefreshMetricsManager, map[string]discovery.DiscovererMetrics) {
- refreshMetrics := discovery.NewRefreshMetrics(reg)
- sdMetrics, err := discovery.RegisterSDMetrics(reg, refreshMetrics)
- require.NoError(t, err)
- return &refreshMetrics, sdMetrics
-}
-
-// TestTargetUpdatesOrder checks that the target updates are received in the expected order.
-func TestTargetUpdatesOrder(t *testing.T) {
- // The order by which the updates are send is determined by the interval passed to the mock discovery adapter
- // Final targets array is ordered alphabetically by the name of the discoverer.
- // For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge.
- testCases := []struct {
- title string
- updates map[string][]update
- expectedTargets [][]*targetgroup.Group
- }{
- {
- title: "Single TP no updates",
- updates: map[string][]update{
- "tp1": {},
- },
- expectedTargets: nil,
- },
- {
- title: "Multiple TPs no updates",
- updates: map[string][]update{
- "tp1": {},
- "tp2": {},
- "tp3": {},
- },
- expectedTargets: nil,
- },
- {
- title: "Single TP empty initials",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{},
- interval: 5 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {},
- },
- },
- {
- title: "Multiple TPs empty initials",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{},
- interval: 5 * time.Millisecond,
- },
- },
- "tp2": {
- {
- targetGroups: []targetgroup.Group{},
- interval: 200 * time.Millisecond,
- },
- },
- "tp3": {
- {
- targetGroups: []targetgroup.Group{},
- interval: 100 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {},
- {},
- {},
- },
- },
- {
- title: "Single TP initials only",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- },
- {
- title: "Multiple TPs initials only",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- },
- "tp2": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- },
- interval: 10 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- }, {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- },
- },
- },
- {
- title: "Single TP initials followed by empty updates",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- interval: 0,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{},
- },
- },
- interval: 10 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{},
- },
- },
- },
- },
- {
- title: "Single TP initials and new groups",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- interval: 0,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- {
- Source: "tp1_group3",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- interval: 10 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- {
- Source: "tp1_group3",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- },
- },
- {
- title: "Multiple TPs initials and new groups",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- interval: 10 * time.Millisecond,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group3",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group4",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- },
- interval: 500 * time.Millisecond,
- },
- },
- "tp2": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "5"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "6"}},
- },
- },
- interval: 100 * time.Millisecond,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp2_group3",
- Targets: []model.LabelSet{{"__instance__": "7"}},
- },
- {
- Source: "tp2_group4",
- Targets: []model.LabelSet{{"__instance__": "8"}},
- },
- },
- interval: 10 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "5"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "6"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "5"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "6"}},
- },
- {
- Source: "tp2_group3",
- Targets: []model.LabelSet{{"__instance__": "7"}},
- },
- {
- Source: "tp2_group4",
- Targets: []model.LabelSet{{"__instance__": "8"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- {
- Source: "tp1_group3",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group4",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "5"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "6"}},
- },
- {
- Source: "tp2_group3",
- Targets: []model.LabelSet{{"__instance__": "7"}},
- },
- {
- Source: "tp2_group4",
- Targets: []model.LabelSet{{"__instance__": "8"}},
- },
- },
- },
- },
- {
- title: "One TP initials arrive after other TP updates.",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- interval: 10 * time.Millisecond,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- },
- interval: 150 * time.Millisecond,
- },
- },
- "tp2": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "5"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "6"}},
- },
- },
- interval: 200 * time.Millisecond,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "7"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "8"}},
- },
- },
- interval: 100 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "5"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "6"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- {
- Source: "tp2_group1",
- Targets: []model.LabelSet{{"__instance__": "7"}},
- },
- {
- Source: "tp2_group2",
- Targets: []model.LabelSet{{"__instance__": "8"}},
- },
- },
- },
- },
-
- {
- title: "Single TP empty update in between",
- updates: map[string][]update{
- "tp1": {
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- interval: 30 * time.Millisecond,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{},
- },
- },
- interval: 10 * time.Millisecond,
- },
- {
- targetGroups: []targetgroup.Group{
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- },
- interval: 300 * time.Millisecond,
- },
- },
- },
- expectedTargets: [][]*targetgroup.Group{
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{},
- },
- },
- {
- {
- Source: "tp1_group1",
- Targets: []model.LabelSet{{"__instance__": "3"}},
- },
- {
- Source: "tp1_group2",
- Targets: []model.LabelSet{{"__instance__": "4"}},
- },
- },
- },
- },
- }
-
- for i, tc := range testCases {
- tc := tc
- t.Run(tc.title, func(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
-
- var totalUpdatesCount int
- for _, up := range tc.updates {
- if len(up) > 0 {
- totalUpdatesCount += len(up)
- }
- }
- provUpdates := make(chan []*targetgroup.Group, totalUpdatesCount)
-
- for _, up := range tc.updates {
- go newMockDiscoveryProvider(up...).Run(ctx, provUpdates)
- }
-
- for x := 0; x < totalUpdatesCount; x++ {
- select {
- case <-ctx.Done():
- t.Fatalf("%d: no update arrived within the timeout limit", x)
- case tgs := <-provUpdates:
- discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(i), provider: tc.title}, tgs)
- for _, got := range discoveryManager.allGroups() {
- assertEqualGroups(t, got, tc.expectedTargets[x])
- }
- }
- }
- })
- }
-}
-
-func assertEqualGroups(t *testing.T, got, expected []*targetgroup.Group) {
- t.Helper()
-
- // Need to sort by the groups's source as the received order is not guaranteed.
- sort.Sort(byGroupSource(got))
- sort.Sort(byGroupSource(expected))
-
- require.Equal(t, expected, got)
-}
-
-func staticConfig(addrs ...string) discovery.StaticConfig {
- var cfg discovery.StaticConfig
- for i, addr := range addrs {
- cfg = append(cfg, &targetgroup.Group{
- Source: strconv.Itoa(i),
- Targets: []model.LabelSet{
- {model.AddressLabel: model.LabelValue(addr)},
- },
- })
- }
- return cfg
-}
-
-func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
- t.Helper()
- if _, ok := tSets[poolKey]; !ok {
- t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
- }
-
- match := false
- var mergedTargets string
- for _, targetGroup := range tSets[poolKey] {
- for _, l := range targetGroup.Targets {
- mergedTargets = mergedTargets + " " + l.String()
- if l.String() == label {
- match = true
- }
- }
- }
- if match != present {
- msg := ""
- if !present {
- msg = "not"
- }
- t.Fatalf("%q should %s be present in Targets labels: %q", label, msg, mergedTargets)
- }
-}
-
-func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
- go discoveryManager.Run()
-
- c := map[string]discovery.Configs{
- "prometheus": {
- staticConfig("foo:9090", "bar:9090"),
- },
- }
- discoveryManager.ApplyConfig(c)
-
- <-discoveryManager.SyncCh()
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true)
-
- c["prometheus"] = discovery.Configs{
- staticConfig("foo:9090"),
- }
- discoveryManager.ApplyConfig(c)
-
- <-discoveryManager.SyncCh()
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false)
-}
-
-func TestDiscovererConfigs(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
- go discoveryManager.Run()
-
- c := map[string]discovery.Configs{
- "prometheus": {
- staticConfig("foo:9090", "bar:9090"),
- staticConfig("baz:9090"),
- },
- }
- discoveryManager.ApplyConfig(c)
-
- <-discoveryManager.SyncCh()
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true)
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/1"}, "{__address__=\"baz:9090\"}", true)
-}
-
-// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after
-// removing all targets from the static_configs sends an update with empty targetGroups.
-// This is required to signal the receiver that this target set has no current targets.
-func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
- go discoveryManager.Run()
-
- c := map[string]discovery.Configs{
- "prometheus": {
- staticConfig("foo:9090"),
- },
- }
- discoveryManager.ApplyConfig(c)
-
- <-discoveryManager.SyncCh()
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
-
- c["prometheus"] = discovery.Configs{
- discovery.StaticConfig{{}},
- }
- discoveryManager.ApplyConfig(c)
-
- <-discoveryManager.SyncCh()
-
- pkey := poolKey{setName: "prometheus", provider: "static/0"}
- targetGroups, ok := discoveryManager.targets[pkey]
- if !ok {
- t.Fatalf("'%v' should be present in target groups", pkey)
- }
- group, ok := targetGroups[""]
- if !ok {
- t.Fatalf("missing '' key in target groups %v", targetGroups)
- }
-
- if len(group.Targets) != 0 {
- t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets))
- }
-}
-
-func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, nil, reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
- go discoveryManager.Run()
-
- c := map[string]discovery.Configs{
- "prometheus": {
- staticConfig("foo:9090"),
- },
- "prometheus2": {
- staticConfig("foo:9090"),
- },
- }
- discoveryManager.ApplyConfig(c)
-
- <-discoveryManager.SyncCh()
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
- verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
- if len(discoveryManager.providers) != 1 {
- t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers))
- }
-}
-
-func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) {
- originalConfig := discovery.Configs{
- staticConfig("foo:9090", "bar:9090", "baz:9090"),
- }
- processedConfig := discovery.Configs{
- staticConfig("foo:9090", "bar:9090", "baz:9090"),
- }
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
- go discoveryManager.Run()
-
- cfgs := map[string]discovery.Configs{
- "prometheus": processedConfig,
- }
- discoveryManager.ApplyConfig(cfgs)
- <-discoveryManager.SyncCh()
-
- for _, cfg := range cfgs {
- require.Equal(t, originalConfig, cfg)
- }
-}
-
-type errorConfig struct{ err error }
-
-func (e errorConfig) Name() string { return "error" }
-func (e errorConfig) NewDiscoverer(discovery.DiscovererOptions) (discovery.Discoverer, error) {
- return nil, e.err
-}
-
-// NewDiscovererMetrics implements discovery.Config.
-func (errorConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
- return &discovery.NoopDiscovererMetrics{}
-}
-
-func TestGaugeFailedConfigs(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics)
- require.NotNil(t, discoveryManager)
- discoveryManager.updatert = 100 * time.Millisecond
- go discoveryManager.Run()
-
- c := map[string]discovery.Configs{
- "prometheus": {
- errorConfig{fmt.Errorf("tests error 0")},
- errorConfig{fmt.Errorf("tests error 1")},
- errorConfig{fmt.Errorf("tests error 2")},
- },
- }
- discoveryManager.ApplyConfig(c)
- <-discoveryManager.SyncCh()
-
- failedCount := client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs)
- if failedCount != 3 {
- t.Fatalf("Expected to have 3 failed configs, got: %v", failedCount)
- }
-
- c["prometheus"] = discovery.Configs{
- staticConfig("foo:9090"),
- }
- discoveryManager.ApplyConfig(c)
- <-discoveryManager.SyncCh()
-
- failedCount = client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs)
- if failedCount != 0 {
- t.Fatalf("Expected to get no failed config, got: %v", failedCount)
- }
-}
-
-func TestCoordinationWithReceiver(t *testing.T) {
- updateDelay := 100 * time.Millisecond
-
- type expect struct {
- delay time.Duration
- tgs map[string][]*targetgroup.Group
- }
-
- testCases := []struct {
- title string
- providers map[string]discovery.Discoverer
- expected []expect
- }{
- {
- title: "Receiver should get all updates even when one provider closes its channel",
- providers: map[string]discovery.Discoverer{
- "once1": &onceProvider{
- tgs: []*targetgroup.Group{
- {
- Source: "tg1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- },
- "mock1": newMockDiscoveryProvider(
- update{
- interval: 2 * updateDelay,
- targetGroups: []targetgroup.Group{
- {
- Source: "tg2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- ),
- },
- expected: []expect{
- {
- tgs: map[string][]*targetgroup.Group{
- "once1": {
- {
- Source: "tg1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- },
- },
- {
- tgs: map[string][]*targetgroup.Group{
- "once1": {
- {
- Source: "tg1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- "mock1": {
- {
- Source: "tg2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- },
- },
- },
- {
- title: "Receiver should get all updates even when the channel is blocked",
- providers: map[string]discovery.Discoverer{
- "mock1": newMockDiscoveryProvider(
- update{
- targetGroups: []targetgroup.Group{
- {
- Source: "tg1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- },
- update{
- interval: 4 * updateDelay,
- targetGroups: []targetgroup.Group{
- {
- Source: "tg2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- ),
- },
- expected: []expect{
- {
- delay: 2 * updateDelay,
- tgs: map[string][]*targetgroup.Group{
- "mock1": {
- {
- Source: "tg1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- },
- },
- },
- {
- delay: 4 * updateDelay,
- tgs: map[string][]*targetgroup.Group{
- "mock1": {
- {
- Source: "tg1",
- Targets: []model.LabelSet{{"__instance__": "1"}},
- },
- {
- Source: "tg2",
- Targets: []model.LabelSet{{"__instance__": "2"}},
- },
- },
- },
- },
- },
- },
- }
-
- for _, tc := range testCases {
- t.Run(tc.title, func(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- reg := prometheus.NewRegistry()
- _, sdMetrics := newTestMetrics(t, reg)
-
- mgr := NewManager(ctx, nil, reg, sdMetrics)
- require.NotNil(t, mgr)
- mgr.updatert = updateDelay
- go mgr.Run()
-
- for name, p := range tc.providers {
- mgr.StartCustomProvider(ctx, name, p)
- }
-
- for i, expected := range tc.expected {
- time.Sleep(expected.delay)
- select {
- case <-ctx.Done():
- t.Fatalf("step %d: no update received in the expected timeframe", i)
- case tgs, ok := <-mgr.SyncCh():
- if !ok {
- t.Fatalf("step %d: discovery manager channel is closed", i)
- }
- if len(tgs) != len(expected.tgs) {
- t.Fatalf("step %d: target groups mismatch, got: %d, expected: %d\ngot: %#v\nexpected: %#v",
- i, len(tgs), len(expected.tgs), tgs, expected.tgs)
- }
- for k := range expected.tgs {
- if _, ok := tgs[k]; !ok {
- t.Fatalf("step %d: target group not found: %s\ngot: %#v", i, k, tgs)
- }
- assertEqualGroups(t, tgs[k], expected.tgs[k])
- }
- }
- }
- })
- }
-}
-
-type update struct {
- targetGroups []targetgroup.Group
- interval time.Duration
-}
-
-type mockdiscoveryProvider struct {
- updates []update
-}
-
-func newMockDiscoveryProvider(updates ...update) mockdiscoveryProvider {
- tp := mockdiscoveryProvider{
- updates: updates,
- }
- return tp
-}
-
-func (tp mockdiscoveryProvider) Run(ctx context.Context, upCh chan<- []*targetgroup.Group) {
- for _, u := range tp.updates {
- if u.interval > 0 {
- select {
- case <-ctx.Done():
- return
- case <-time.After(u.interval):
- }
- }
- tgs := make([]*targetgroup.Group, len(u.targetGroups))
- for i := range u.targetGroups {
- tgs[i] = &u.targetGroups[i]
- }
- upCh <- tgs
- }
- <-ctx.Done()
-}
-
-// byGroupSource implements sort.Interface so we can sort by the Source field.
-type byGroupSource []*targetgroup.Group
-
-func (a byGroupSource) Len() int { return len(a) }
-func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source }
-
-// onceProvider sends updates once (if any) and closes the update channel.
-type onceProvider struct {
- tgs []*targetgroup.Group
-}
-
-func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) {
- if len(o.tgs) > 0 {
- ch <- o.tgs
- }
- close(ch)
-}
diff --git a/discovery/legacymanager/registry.go b/discovery/legacymanager/registry.go
deleted file mode 100644
index 955705394d..0000000000
--- a/discovery/legacymanager/registry.go
+++ /dev/null
@@ -1,261 +0,0 @@
-// Copyright 2020 The Prometheus Authors
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package legacymanager
-
-import (
- "errors"
- "fmt"
- "reflect"
- "sort"
- "strconv"
- "strings"
- "sync"
-
- "gopkg.in/yaml.v2"
-
- "github.com/prometheus/prometheus/discovery"
- "github.com/prometheus/prometheus/discovery/targetgroup"
-)
-
-const (
- configFieldPrefix = "AUTO_DISCOVERY_"
- staticConfigsKey = "static_configs"
- staticConfigsFieldName = configFieldPrefix + staticConfigsKey
-)
-
-var (
- configNames = make(map[string]discovery.Config)
- configFieldNames = make(map[reflect.Type]string)
- configFields []reflect.StructField
-
- configTypesMu sync.Mutex
- configTypes = make(map[reflect.Type]reflect.Type)
-
- emptyStructType = reflect.TypeOf(struct{}{})
- configsType = reflect.TypeOf(discovery.Configs{})
-)
-
-// RegisterConfig registers the given Config type for YAML marshaling and unmarshaling.
-func RegisterConfig(config discovery.Config) {
- registerConfig(config.Name()+"_sd_configs", reflect.TypeOf(config), config)
-}
-
-func init() {
- // N.B.: static_configs is the only Config type implemented by default.
- // All other types are registered at init by their implementing packages.
- elemTyp := reflect.TypeOf(&targetgroup.Group{})
- registerConfig(staticConfigsKey, elemTyp, discovery.StaticConfig{})
-}
-
-func registerConfig(yamlKey string, elemType reflect.Type, config discovery.Config) {
- name := config.Name()
- if _, ok := configNames[name]; ok {
- panic(fmt.Sprintf("discovery: Config named %q is already registered", name))
- }
- configNames[name] = config
-
- fieldName := configFieldPrefix + yamlKey // Field must be exported.
- configFieldNames[elemType] = fieldName
-
- // Insert fields in sorted order.
- i := sort.Search(len(configFields), func(k int) bool {
- return fieldName < configFields[k].Name
- })
- configFields = append(configFields, reflect.StructField{}) // Add empty field at end.
- copy(configFields[i+1:], configFields[i:]) // Shift fields to the right.
- configFields[i] = reflect.StructField{ // Write new field in place.
- Name: fieldName,
- Type: reflect.SliceOf(elemType),
- Tag: reflect.StructTag(`yaml:"` + yamlKey + `,omitempty"`),
- }
-}
-
-func getConfigType(out reflect.Type) reflect.Type {
- configTypesMu.Lock()
- defer configTypesMu.Unlock()
- if typ, ok := configTypes[out]; ok {
- return typ
- }
- // Initial exported fields map one-to-one.
- var fields []reflect.StructField
- for i, n := 0, out.NumField(); i < n; i++ {
- switch field := out.Field(i); {
- case field.PkgPath == "" && field.Type != configsType:
- fields = append(fields, field)
- default:
- fields = append(fields, reflect.StructField{
- Name: "_" + field.Name, // Field must be unexported.
- PkgPath: out.PkgPath(),
- Type: emptyStructType,
- })
- }
- }
- // Append extra config fields on the end.
- fields = append(fields, configFields...)
- typ := reflect.StructOf(fields)
- configTypes[out] = typ
- return typ
-}
-
-// UnmarshalYAMLWithInlineConfigs helps implement yaml.Unmarshal for structs
-// that have a Configs field that should be inlined.
-func UnmarshalYAMLWithInlineConfigs(out interface{}, unmarshal func(interface{}) error) error {
- outVal := reflect.ValueOf(out)
- if outVal.Kind() != reflect.Ptr {
- return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out)
- }
- outVal = outVal.Elem()
- if outVal.Kind() != reflect.Struct {
- return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out)
- }
- outTyp := outVal.Type()
-
- cfgTyp := getConfigType(outTyp)
- cfgPtr := reflect.New(cfgTyp)
- cfgVal := cfgPtr.Elem()
-
- // Copy shared fields (defaults) to dynamic value.
- var configs *discovery.Configs
- for i, n := 0, outVal.NumField(); i < n; i++ {
- if outTyp.Field(i).Type == configsType {
- configs = outVal.Field(i).Addr().Interface().(*discovery.Configs)
- continue
- }
- if cfgTyp.Field(i).PkgPath != "" {
- continue // Field is unexported: ignore.
- }
- cfgVal.Field(i).Set(outVal.Field(i))
- }
- if configs == nil {
- return fmt.Errorf("discovery: Configs field not found in type: %T", out)
- }
-
- // Unmarshal into dynamic value.
- if err := unmarshal(cfgPtr.Interface()); err != nil {
- return replaceYAMLTypeError(err, cfgTyp, outTyp)
- }
-
- // Copy shared fields from dynamic value.
- for i, n := 0, outVal.NumField(); i < n; i++ {
- if cfgTyp.Field(i).PkgPath != "" {
- continue // Field is unexported: ignore.
- }
- outVal.Field(i).Set(cfgVal.Field(i))
- }
-
- var err error
- *configs, err = readConfigs(cfgVal, outVal.NumField())
- return err
-}
-
-func readConfigs(structVal reflect.Value, startField int) (discovery.Configs, error) {
- var (
- configs discovery.Configs
- targets []*targetgroup.Group
- )
- for i, n := startField, structVal.NumField(); i < n; i++ {
- field := structVal.Field(i)
- if field.Kind() != reflect.Slice {
- panic("discovery: internal error: field is not a slice")
- }
- for k := 0; k < field.Len(); k++ {
- val := field.Index(k)
- if val.IsZero() || (val.Kind() == reflect.Ptr && val.Elem().IsZero()) {
- key := configFieldNames[field.Type().Elem()]
- key = strings.TrimPrefix(key, configFieldPrefix)
- return nil, fmt.Errorf("empty or null section in %s", key)
- }
- switch c := val.Interface().(type) {
- case *targetgroup.Group:
- // Add index to the static config target groups for unique identification
- // within scrape pool.
- c.Source = strconv.Itoa(len(targets))
- // Coalesce multiple static configs into a single static config.
- targets = append(targets, c)
- case discovery.Config:
- configs = append(configs, c)
- default:
- panic("discovery: internal error: slice element is not a Config")
- }
- }
- }
- if len(targets) > 0 {
- configs = append(configs, discovery.StaticConfig(targets))
- }
- return configs, nil
-}
-
-// MarshalYAMLWithInlineConfigs helps implement yaml.Marshal for structs
-// that have a Configs field that should be inlined.
-func MarshalYAMLWithInlineConfigs(in interface{}) (interface{}, error) {
- inVal := reflect.ValueOf(in)
- for inVal.Kind() == reflect.Ptr {
- inVal = inVal.Elem()
- }
- inTyp := inVal.Type()
-
- cfgTyp := getConfigType(inTyp)
- cfgPtr := reflect.New(cfgTyp)
- cfgVal := cfgPtr.Elem()
-
- // Copy shared fields to dynamic value.
- var configs *discovery.Configs
- for i, n := 0, inTyp.NumField(); i < n; i++ {
- if inTyp.Field(i).Type == configsType {
- configs = inVal.Field(i).Addr().Interface().(*discovery.Configs)
- }
- if cfgTyp.Field(i).PkgPath != "" {
- continue // Field is unexported: ignore.
- }
- cfgVal.Field(i).Set(inVal.Field(i))
- }
- if configs == nil {
- return nil, fmt.Errorf("discovery: Configs field not found in type: %T", in)
- }
-
- if err := writeConfigs(cfgVal, *configs); err != nil {
- return nil, err
- }
-
- return cfgPtr.Interface(), nil
-}
-
-func writeConfigs(structVal reflect.Value, configs discovery.Configs) error {
- targets := structVal.FieldByName(staticConfigsFieldName).Addr().Interface().(*[]*targetgroup.Group)
- for _, c := range configs {
- if sc, ok := c.(discovery.StaticConfig); ok {
- *targets = append(*targets, sc...)
- continue
- }
- fieldName, ok := configFieldNames[reflect.TypeOf(c)]
- if !ok {
- return fmt.Errorf("discovery: cannot marshal unregistered Config type: %T", c)
- }
- field := structVal.FieldByName(fieldName)
- field.Set(reflect.Append(field, reflect.ValueOf(c)))
- }
- return nil
-}
-
-func replaceYAMLTypeError(err error, oldTyp, newTyp reflect.Type) error {
- var e *yaml.TypeError
- if errors.As(err, &e) {
- oldStr := oldTyp.String()
- newStr := newTyp.String()
- for i, s := range e.Errors {
- e.Errors[i] = strings.ReplaceAll(s, oldStr, newStr)
- }
- }
- return err
-}
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index d637312a30..f16fcbd7b5 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -55,7 +55,7 @@ The Prometheus monitoring server
| --query.timeout
| Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| --query.max-concurrency
| Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| --query.max-samples
| Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
-| --enable-feature
... | Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
+| --enable-feature
... | Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| --agent
| Run Prometheus in 'Agent mode'. | |
| --log.level
| Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| --log.format
| Output format of log messages. One of: [logfmt, json] | `logfmt` |
diff --git a/docs/feature_flags.md b/docs/feature_flags.md
index 1a8908548c..f027ffce6c 100644
--- a/docs/feature_flags.md
+++ b/docs/feature_flags.md
@@ -47,20 +47,6 @@ When enabled, for each instance scrape, Prometheus stores a sample in the follow
to find out how close they are to reaching the limit with `scrape_samples_post_metric_relabeling / scrape_sample_limit`. Note that `scrape_sample_limit` can be zero if there is no limit configured, which means that the query above can return `+Inf` for targets with no limit (as we divide by zero). If you want to query only for targets that do have a sample limit use this query: `scrape_samples_post_metric_relabeling / (scrape_sample_limit > 0)`.
- `scrape_body_size_bytes`. The uncompressed size of the most recent scrape response, if successful. Scrapes failing because `body_size_limit` is exceeded report `-1`, other scrape failures report `0`.
-## New service discovery manager
-
-`--enable-feature=new-service-discovery-manager`
-
-When enabled, Prometheus uses a new service discovery manager that does not
-restart unchanged discoveries upon reloading. This makes reloads faster and reduces
-pressure on service discoveries' sources.
-
-Users are encouraged to test the new service discovery manager and report any
-issues upstream.
-
-In future releases, this new service discovery manager will become the default and
-this feature flag will be ignored.
-
## Prometheus agent
`--enable-feature=agent`