From e405e2f1ea2156f76a7e70f5471b88a590a8e24a Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 25 Nov 2017 13:13:54 +0000 Subject: [PATCH 01/13] refactored discovery --- cmd/prometheus/main.go | 45 ++- config/config.go | 2 +- discovery/discovery.go | 319 ------------------ discovery/manager.go | 293 ++++++++++++++++ .../{discovery_test.go => manager_test.go} | 0 notifier/notifier.go | 16 - retrieval/manager.go | 152 +++++++++ ...{targetmanager_test.go => manager_test.go} | 0 retrieval/targetmanager.go | 194 ----------- web/web.go | 12 +- 10 files changed, 481 insertions(+), 552 deletions(-) delete mode 100644 discovery/discovery.go create mode 100644 discovery/manager.go rename discovery/{discovery_test.go => manager_test.go} (100%) create mode 100644 retrieval/manager.go rename retrieval/{targetmanager_test.go => manager_test.go} (100%) delete mode 100644 retrieval/targetmanager.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b03db5891f..132605c365 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -43,6 +43,7 @@ import ( "github.com/prometheus/common/promlog" promlogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" @@ -230,12 +231,17 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager")) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ctx, cancelCtx = context.WithCancel(context.Background()) + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) + discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + ctxScrape, cancelScrape = context.WithCancel(context.Background()) + scrapeManager = retrieval.NewScrapeManager(ctxScrape, log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ctxWeb, cancelWeb = context.WithCancel(context.Background()) + webHandler = web.New(log.With(logger, "component", "web"), &cfg.web) ) + ctx := context.Background() ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, QueryFunc: rules.EngineQueryFunc(queryEngine), @@ -250,7 +256,7 @@ func main() { cfg.web.TSDB = localStorage.Get cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine - cfg.web.TargetManager = targetManager + cfg.web.ScrapeManager = scrapeManager cfg.web.RuleManager = ruleManager cfg.web.Notifier = notifier @@ -268,8 +274,6 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } - webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) - // Monitor outgoing connections on default transport with conntrack. http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc( conntrack.DialWithTracing(), @@ -306,6 +310,17 @@ func main() { var g group.Group { + g.Add( + func() error { + err := discoveryManager.Run() + level.Info(logger).Log("msg", "Discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping discovery manager...") + cancelDiscovery() + }, + ) term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) cancel := make(chan struct{}) @@ -426,7 +441,7 @@ func main() { { g.Add( func() error { - if err := webHandler.Run(ctx); err != nil { + if err := webHandler.Run(ctxWeb); err != nil { return fmt.Errorf("Error starting web server: %s", err) } return nil @@ -435,7 +450,7 @@ func main() { // Keep this interrupt before the ruleManager.Stop(). // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - cancelCtx() + cancelWeb() }, ) } @@ -468,17 +483,15 @@ func main() { ) } { - // TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel. - cancel := make(chan struct{}) g.Add( func() error { - targetManager.Run() - <-cancel - return nil + err := scrapeManager.Run(discoveryManager.SyncCh()) + level.Info(logger).Log("msg", "Scrape manager stopped") + return err }, func(err error) { - targetManager.Stop() - close(cancel) + level.Info(logger).Log("msg", "Stopping scrape manager...") + cancelScrape() }, ) } diff --git a/config/config.go b/config/config.go index ab4208008d..73a938bca3 100644 --- a/config/config.go +++ b/config/config.go @@ -721,7 +721,7 @@ func (a *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error { return checkOverflow(a.XXX, "basic_auth") } -// TargetGroup is a set of targets with a common label set. +// TargetGroup is a set of targets with a common label set(production , test, staging etc.). type TargetGroup struct { // Targets is a list of targets identified by a label set. Each target is // uniquely identifiable in the group by its address label. diff --git a/discovery/discovery.go b/discovery/discovery.go deleted file mode 100644 index 9cf5f8190f..0000000000 --- a/discovery/discovery.go +++ /dev/null @@ -1,319 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/azure" - "github.com/prometheus/prometheus/discovery/consul" - "github.com/prometheus/prometheus/discovery/dns" - "github.com/prometheus/prometheus/discovery/ec2" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/gce" - "github.com/prometheus/prometheus/discovery/kubernetes" - "github.com/prometheus/prometheus/discovery/marathon" - "github.com/prometheus/prometheus/discovery/openstack" - "github.com/prometheus/prometheus/discovery/triton" - "github.com/prometheus/prometheus/discovery/zookeeper" -) - -// A TargetProvider provides information about target groups. It maintains a set -// of sources from which TargetGroups can originate. Whenever a target provider -// detects a potential change, it sends the TargetGroup through its provided channel. -// -// The TargetProvider does not have to guarantee that an actual change happened. -// It does guarantee that it sends the new TargetGroup whenever a change happens. -// -// TargetProviders should initially send a full set of all discoverable TargetGroups. -type TargetProvider interface { - // Run hands a channel to the target provider through which it can send - // updated target groups. - // Must returns if the context gets canceled. It should not close the update - // channel on returning. - Run(ctx context.Context, up chan<- []*config.TargetGroup) -} - -// ProvidersFromConfig returns all TargetProviders configured in cfg. -func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider { - providers := map[string]TargetProvider{} - - app := func(mech string, i int, tp TargetProvider) { - providers[fmt.Sprintf("%s/%d", mech, i)] = tp - } - - for i, c := range cfg.DNSSDConfigs { - app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns"))) - } - for i, c := range cfg.FileSDConfigs { - app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file"))) - } - for i, c := range cfg.ConsulSDConfigs { - k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul")) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err) - continue - } - app("consul", i, k) - } - for i, c := range cfg.MarathonSDConfigs { - m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon")) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err) - continue - } - app("marathon", i, m) - } - for i, c := range cfg.KubernetesSDConfigs { - k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) - continue - } - app("kubernetes", i, k) - } - for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper"))) - } - for i, c := range cfg.NerveSDConfigs { - app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve"))) - } - for i, c := range cfg.EC2SDConfigs { - app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2"))) - } - for i, c := range cfg.OpenstackSDConfigs { - openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack")) - if err != nil { - level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) - continue - } - app("openstack", i, openstackd) - } - - for i, c := range cfg.GCESDConfigs { - gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce")) - if err != nil { - level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err) - continue - } - app("gce", i, gced) - } - for i, c := range cfg.AzureSDConfigs { - app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure"))) - } - for i, c := range cfg.TritonSDConfigs { - t, err := triton.New(log.With(logger, "discovery", "trition"), c) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err) - continue - } - app("triton", i, t) - } - if len(cfg.StaticConfigs) > 0 { - app("static", 0, NewStaticProvider(cfg.StaticConfigs)) - } - - return providers -} - -// StaticProvider holds a list of target groups that never change. -type StaticProvider struct { - TargetGroups []*config.TargetGroup -} - -// NewStaticProvider returns a StaticProvider configured with the given -// target groups. -func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { - for i, tg := range groups { - tg.Source = fmt.Sprintf("%d", i) - } - return &StaticProvider{groups} -} - -// Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // We still have to consider that the consumer exits right away in which case - // the context will be canceled. - select { - case ch <- sd.TargetGroups: - case <-ctx.Done(): - } - close(ch) -} - -// TargetSet handles multiple TargetProviders and sends a full overview of their -// discovered TargetGroups to a Syncer. -type TargetSet struct { - mtx sync.RWMutex - // Sets of targets by a source string that is unique across target providers. - tgroups map[string]*config.TargetGroup - - syncer Syncer - - syncCh chan struct{} - providerCh chan map[string]TargetProvider - cancelProviders func() -} - -// Syncer receives updates complete sets of TargetGroups. -type Syncer interface { - Sync([]*config.TargetGroup) -} - -// NewTargetSet returns a new target sending TargetGroups to the Syncer. -func NewTargetSet(s Syncer) *TargetSet { - return &TargetSet{ - syncCh: make(chan struct{}, 1), - providerCh: make(chan map[string]TargetProvider), - syncer: s, - } -} - -// Run starts the processing of target providers and their updates. -// It blocks until the context gets canceled. -func (ts *TargetSet) Run(ctx context.Context) { -Loop: - for { - // Throttle syncing to once per five seconds. - select { - case <-ctx.Done(): - break Loop - case p := <-ts.providerCh: - ts.updateProviders(ctx, p) - case <-time.After(5 * time.Second): - } - - select { - case <-ctx.Done(): - break Loop - case <-ts.syncCh: - ts.sync() - case p := <-ts.providerCh: - ts.updateProviders(ctx, p) - } - } -} - -func (ts *TargetSet) sync() { - ts.mtx.RLock() - var all []*config.TargetGroup - for _, tg := range ts.tgroups { - all = append(all, tg) - } - ts.mtx.RUnlock() - - ts.syncer.Sync(all) -} - -// UpdateProviders sets new target providers for the target set. -func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) { - ts.providerCh <- p -} - -func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) { - - // Stop all previous target providers of the target set. - if ts.cancelProviders != nil { - ts.cancelProviders() - } - ctx, ts.cancelProviders = context.WithCancel(ctx) - - var wg sync.WaitGroup - // (Re-)create a fresh tgroups map to not keep stale targets around. We - // will retrieve all targets below anyway, so cleaning up everything is - // safe and doesn't inflict any additional cost. - ts.mtx.Lock() - ts.tgroups = map[string]*config.TargetGroup{} - ts.mtx.Unlock() - - for name, prov := range providers { - wg.Add(1) - - updates := make(chan []*config.TargetGroup) - go prov.Run(ctx, updates) - - go func(name string, prov TargetProvider) { - select { - case <-ctx.Done(): - case initial, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - // First set of all targets the provider knows. - for _, tgroup := range initial { - ts.setTargetGroup(name, tgroup) - } - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - wg.Done() - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - for _, tg := range tgs { - ts.update(name, tg) - } - } - } - }(name, prov) - } - - // We wait for a full initial set of target groups before releasing the mutex - // to ensure the initial sync is complete and there are no races with subsequent updates. - wg.Wait() - // Just signal that there are initial sets to sync now. Actual syncing must only - // happen in the runScraping loop. - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -// update handles a target group update from a target provider identified by the name. -func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) { - ts.setTargetGroup(name, tgroup) - - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) { - ts.mtx.Lock() - defer ts.mtx.Unlock() - - if tg == nil { - return - } - ts.tgroups[name+"/"+tg.Source] = tg -} diff --git a/discovery/manager.go b/discovery/manager.go new file mode 100644 index 0000000000..7119c54705 --- /dev/null +++ b/discovery/manager.go @@ -0,0 +1,293 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/prometheus/config" + + "github.com/prometheus/prometheus/discovery/azure" + "github.com/prometheus/prometheus/discovery/consul" + "github.com/prometheus/prometheus/discovery/dns" + "github.com/prometheus/prometheus/discovery/ec2" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/gce" + "github.com/prometheus/prometheus/discovery/kubernetes" + "github.com/prometheus/prometheus/discovery/marathon" + "github.com/prometheus/prometheus/discovery/openstack" + "github.com/prometheus/prometheus/discovery/triton" + "github.com/prometheus/prometheus/discovery/zookeeper" +) + +// DiscoveryProvider provides information about target groups. It maintains a set +// of sources from which TargetGroups can originate. Whenever a discovery provider +// detects a potential change, it sends the TargetGroup through its provided channel. +// +// The DiscoveryProvider does not have to guarantee that an actual change happened. +// It does guarantee that it sends the new TargetGroup whenever a change happens. +// +// DiscoveryProviders should initially send a full set of all discoverable TargetGroups. +type DiscoveryProvider interface { + // Run hands a channel to the discovery provider(consul,dns etc) through which it can send + // updated target groups. + // Must returns if the context gets canceled. It should not close the update + // channel on returning. + Run(ctx context.Context, up chan<- []*config.TargetGroup) +} + +type targetSetProvider struct { + cancel func() + tgroups []*config.TargetGroup +} + +// NewDiscoveryManager is the DiscoveryManager constructor +func NewDiscoveryManager(ctx context.Context, logger log.Logger) *DiscoveryManager { + return &DiscoveryManager{ + ctx: ctx, + logger: logger, + actionCh: make(chan func()), + syncCh: make(chan map[string][]*config.TargetGroup), + targetSetProviders: make(map[string]map[string]*targetSetProvider), + } +} + +// DiscoveryManager maintains a set of discovery providers and sends each update to a channel used by other packages. +type DiscoveryManager struct { + ctx context.Context + logger log.Logger + syncCh chan map[string][]*config.TargetGroup // map[targetSetName] + actionCh chan func() + targetSetProviders map[string]map[string]*targetSetProvider // map[targetSetName]map[providerName] +} + +// Run starts the background processing +func (m *DiscoveryManager) Run() error { + for { + select { + case f := <-m.actionCh: + f() + case <-m.ctx.Done(): + return m.ctx.Err() + } + } + +} + +// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates +func (m *DiscoveryManager) SyncCh() <-chan map[string][]*config.TargetGroup { + return m.syncCh +} + +// ApplyConfig removes all running discovery providers and starts new ones using the provided config. +func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { + err := make(chan error) + m.actionCh <- func() { + m.cancelDiscoveryProviders() + for _, scfg := range cfg.ScrapeConfigs { + for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { + ctx, cancel := context.WithCancel(m.ctx) + updates := make(chan []*config.TargetGroup) + + m.createProvider(cancel, scfg.JobName, provName) + + go prov.Run(ctx, updates) + go func(provName string) { + select { + case <-ctx.Done(): + // First set of all targets the provider knows. + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + break + } + m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) + } + } + }(provName) + } + } + close(err) + } + + return <-err +} + +func (m *DiscoveryManager) cancelDiscoveryProviders() { + for targetSetName, targetSetProviders := range m.targetSetProviders { + for _, discoveryProvider := range targetSetProviders { + discoveryProvider.cancel() + } + delete(m.targetSetProviders, targetSetName) + } +} + +func (m *DiscoveryManager) createProvider(cancel context.CancelFunc, tsName, provName string) { + if m.targetSetProviders[tsName] == nil { + m.targetSetProviders[tsName] = make(map[string]*targetSetProvider) + } + m.targetSetProviders[tsName][provName] = &targetSetProvider{ + cancel: cancel, + tgroups: []*config.TargetGroup{}, + } +} + +// mergeGroups adds a new target group for a named discovery provider and returns all target groups for a given target set +func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { + tset := make(chan map[string][]*config.TargetGroup) + m.actionCh <- func() { + if tg != nil { + m.targetSetProviders[tsName][provName].tgroups = tg + } + var tgAll []*config.TargetGroup + for _, prov := range m.targetSetProviders[tsName] { + for _, tg := range prov.tgroups { + tgAll = append(tgAll, tg) + } + } + t := make(map[string][]*config.TargetGroup) + t[tsName] = tgAll + tset <- t + } + return <-tset +} + +func (m *DiscoveryManager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]DiscoveryProvider { + providers := map[string]DiscoveryProvider{} + + app := func(mech string, i int, tp DiscoveryProvider) { + providers[fmt.Sprintf("%s/%d", mech, i)] = tp + } + + for i, c := range cfg.DNSSDConfigs { + app("dns", i, dns.NewDiscovery(c, log.With(m.logger, "discovery", "dns"))) + } + for i, c := range cfg.FileSDConfigs { + app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file"))) + } + for i, c := range cfg.ConsulSDConfigs { + k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err) + continue + } + app("consul", i, k) + } + for i, c := range cfg.MarathonSDConfigs { + t, err := marathon.NewDiscovery(c, log.With(m.logger, "discovery", "marathon")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err) + continue + } + app("marathon", i, t) + } + for i, c := range cfg.KubernetesSDConfigs { + k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) + continue + } + app("kubernetes", i, k) + } + for i, c := range cfg.ServersetSDConfigs { + app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper"))) + } + for i, c := range cfg.NerveSDConfigs { + app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve"))) + } + for i, c := range cfg.EC2SDConfigs { + app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2"))) + } + for i, c := range cfg.OpenstackSDConfigs { + openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) + continue + } + app("openstack", i, openstackd) + } + + for i, c := range cfg.GCESDConfigs { + gced, err := gce.NewDiscovery(c, log.With(m.logger, "discovery", "gce")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err) + continue + } + app("gce", i, gced) + } + for i, c := range cfg.AzureSDConfigs { + app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure"))) + } + for i, c := range cfg.TritonSDConfigs { + t, err := triton.New(log.With(m.logger, "discovery", "trition"), c) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err) + continue + } + app("triton", i, t) + } + if len(cfg.StaticConfigs) > 0 { + app("static", 0, NewStaticProvider(cfg.StaticConfigs)) + } + + return providers +} + +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*config.TargetGroup +} + +// NewStaticProvider returns a StaticProvider configured with the given +// target groups. +func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { + for i, tg := range groups { + tg.Source = fmt.Sprintf("%d", i) + } + return &StaticProvider{groups} +} + +// Run implements the DiscoveryProvider interface. +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): + } + close(ch) +} diff --git a/discovery/discovery_test.go b/discovery/manager_test.go similarity index 100% rename from discovery/discovery_test.go rename to discovery/manager_test.go diff --git a/notifier/notifier.go b/notifier/notifier.go index 36e63fd6e7..d2c14b7655 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -35,7 +35,6 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/util/httputil" @@ -248,7 +247,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs amSets := []*alertmanagerSet{} - ctx, cancel := context.WithCancel(n.ctx) for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs { ams, err := newAlertmanagerSet(cfg, n.logger) @@ -261,17 +259,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { amSets = append(amSets, ams) } - // After all sets were created successfully, start them and cancel the - // old ones. - for _, ams := range amSets { - go ams.ts.Run(ctx) - ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger)) - } - if n.cancelDiscovery != nil { - n.cancelDiscovery() - } - - n.cancelDiscovery = cancel n.alertmanagers = amSets return nil @@ -504,7 +491,6 @@ func (a alertmanagerLabels) url() *url.URL { // alertmanagerSet contains a set of Alertmanagers discovered via a group of service // discovery definitions that have a common configuration on how alerts should be sent. type alertmanagerSet struct { - ts *discovery.TargetSet cfg *config.AlertmanagerConfig client *http.Client @@ -525,8 +511,6 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale cfg: cfg, logger: logger, } - s.ts = discovery.NewTargetSet(s) - return s, nil } diff --git a/retrieval/manager.go b/retrieval/manager.go new file mode 100644 index 0000000000..694d14b97b --- /dev/null +++ b/retrieval/manager.go @@ -0,0 +1,152 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "context" + "fmt" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" +) + +// Appendable returns an Appender. +type Appendable interface { + Appender() (storage.Appender, error) +} + +// NewScrapeManager is the ScrapeManager constructor +func NewScrapeManager(ctx context.Context, logger log.Logger, app Appendable) *ScrapeManager { + + return &ScrapeManager{ + ctx: ctx, + append: app, + logger: logger, + actionCh: make(chan func()), + scrapeConfigs: make(map[string]*config.ScrapeConfig), + scrapePools: make(map[string]*scrapePool), + } +} + +// ScrapeManager maintains a set of scrape pools and manages start/stop cicles +// when receiving new target groups form the discovery manager. +type ScrapeManager struct { + ctx context.Context + logger log.Logger + append Appendable + scrapeConfigs map[string]*config.ScrapeConfig + scrapePools map[string]*scrapePool + actionCh chan func() +} + +// Run starts background processing to handle target updates and reload the scraping loops. +func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error { + level.Info(m.logger).Log("msg", "Starting scrape manager...") + + for { + select { + case f := <-m.actionCh: + f() + case ts := <-tsets: + m.reload(ts) + case <-m.ctx.Done(): + return m.ctx.Err() + } + } +} + +// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. +func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { + done := make(chan struct{}) + m.actionCh <- func() { + for _, scfg := range cfg.ScrapeConfigs { + m.scrapeConfigs[scfg.JobName] = scfg + } + close(done) + } + <-done + return nil +} + +// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. +func (tm *TargetManager) TargetMap() map[string][]*Target { + tm.mtx.RLock() + defer tm.mtx.RUnlock() + + targetsMap := make(map[string][]*Target) + for jobName, ps := range tm.targetSets { + ps.sp.mtx.RLock() + for _, t := range ps.sp.targets { + targetsMap[jobName] = append(targetsMap[jobName], t) + } + targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...) + ps.sp.mtx.RUnlock() + } + return targetsMap +} + +// Targets returns the targets currently being scraped. +func (m *ScrapeManager) Targets() []*Target { + targets := make(chan []*Target) + m.actionCh <- func() { + var t []*Target + for _, p := range m.scrapePools { + p.mtx.RLock() + for _, tt := range p.targets { + t = append(t, tt) + } + p.mtx.RUnlock() + } + targets <- t + } + return <-targets +} + +func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { + for tsetName, tgroup := range t { + scrapeConfig, ok := m.scrapeConfigs[tsetName] + if !ok { + return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) + } + + // scrape pool doesn't exist so start a new one + existing, ok := m.scrapePools[tsetName] + if !ok { + sp := newScrapePool(m.ctx, scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + m.scrapePools[tsetName] = sp + sp.Sync(tgroup) + + } else { + existing.Sync(tgroup) + } + + // cleanup - check config and cancel the scrape loops if it don't exist in the scrape config + jobs := make(map[string]struct{}) + + for k := range m.scrapeConfigs { + jobs[k] = struct{}{} + } + + for name, sp := range m.scrapePools { + if _, ok := jobs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) + } + } + } + return nil +} diff --git a/retrieval/targetmanager_test.go b/retrieval/manager_test.go similarity index 100% rename from retrieval/targetmanager_test.go rename to retrieval/manager_test.go diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go deleted file mode 100644 index f826406642..0000000000 --- a/retrieval/targetmanager.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "context" - "sync" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/storage" -) - -// TargetManager maintains a set of targets, starts and stops their scraping and -// creates the new targets based on the target groups it receives from various -// target providers. -type TargetManager struct { - append Appendable - scrapeConfigs []*config.ScrapeConfig - - mtx sync.RWMutex - ctx context.Context - cancel func() - wg sync.WaitGroup - - // Set of unqiue targets by scrape configuration. - targetSets map[string]*targetSet - logger log.Logger - starting chan struct{} -} - -type targetSet struct { - ctx context.Context - cancel func() - - ts *discovery.TargetSet - sp *scrapePool -} - -// Appendable returns an Appender. -type Appendable interface { - Appender() (storage.Appender, error) -} - -// NewTargetManager creates a new TargetManager. -func NewTargetManager(app Appendable, logger log.Logger) *TargetManager { - return &TargetManager{ - append: app, - targetSets: map[string]*targetSet{}, - logger: logger, - starting: make(chan struct{}), - } -} - -// Run starts background processing to handle target updates. -func (tm *TargetManager) Run() { - level.Info(tm.logger).Log("msg", "Starting target manager...") - - tm.mtx.Lock() - - tm.ctx, tm.cancel = context.WithCancel(context.Background()) - tm.reload() - - tm.mtx.Unlock() - close(tm.starting) - - tm.wg.Wait() -} - -// Stop all background processing. -func (tm *TargetManager) Stop() { - <-tm.starting - level.Info(tm.logger).Log("msg", "Stopping target manager...") - - tm.mtx.Lock() - // Cancel the base context, this will cause all target providers to shut down - // and all in-flight scrapes to abort immmediately. - // Started inserts will be finished before terminating. - tm.cancel() - tm.mtx.Unlock() - - // Wait for all scrape inserts to complete. - tm.wg.Wait() - - level.Info(tm.logger).Log("msg", "Target manager stopped") -} - -func (tm *TargetManager) reload() { - jobs := map[string]struct{}{} - - // Start new target sets and update existing ones. - for _, scfg := range tm.scrapeConfigs { - jobs[scfg.JobName] = struct{}{} - - ts, ok := tm.targetSets[scfg.JobName] - if !ok { - ctx, cancel := context.WithCancel(tm.ctx) - ts = &targetSet{ - ctx: ctx, - cancel: cancel, - sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)), - } - ts.ts = discovery.NewTargetSet(ts.sp) - - tm.targetSets[scfg.JobName] = ts - - tm.wg.Add(1) - - go func(ts *targetSet) { - // Run target set, which blocks until its context is canceled. - // Gracefully shut down pending scrapes in the scrape pool afterwards. - ts.ts.Run(ctx) - ts.sp.stop() - tm.wg.Done() - }(ts) - } else { - ts.sp.reload(scfg) - } - ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger)) - } - - // Remove old target sets. Waiting for scrape pools to complete pending - // scrape inserts is already guaranteed by the goroutine that started the target set. - for name, ts := range tm.targetSets { - if _, ok := jobs[name]; !ok { - ts.cancel() - delete(tm.targetSets, name) - } - } -} - -// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. -func (tm *TargetManager) TargetMap() map[string][]*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targetsMap := make(map[string][]*Target) - for jobName, ps := range tm.targetSets { - ps.sp.mtx.RLock() - for _, t := range ps.sp.targets { - targetsMap[jobName] = append(targetsMap[jobName], t) - } - targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...) - ps.sp.mtx.RUnlock() - } - return targetsMap -} - -// Targets returns the targets currently being scraped. -func (tm *TargetManager) Targets() []*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targets := []*Target{} - for _, ps := range tm.targetSets { - ps.sp.mtx.RLock() - - for _, t := range ps.sp.targets { - targets = append(targets, t) - } - - ps.sp.mtx.RUnlock() - } - - return targets -} - -// ApplyConfig resets the manager's target providers and job configurations as defined -// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. -func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - tm.scrapeConfigs = cfg.ScrapeConfigs - - if tm.ctx != nil { - tm.reload() - } - return nil -} diff --git a/web/web.go b/web/web.go index d9875150e6..6dfe67a399 100644 --- a/web/web.go +++ b/web/web.go @@ -71,7 +71,7 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"} type Handler struct { logger log.Logger - targetManager *retrieval.TargetManager + scrapeManager *retrieval.ScrapeManager ruleManager *rules.Manager queryEngine *promql.Engine context context.Context @@ -125,7 +125,7 @@ type Options struct { TSDB func() *tsdb.DB Storage storage.Storage QueryEngine *promql.Engine - TargetManager *retrieval.TargetManager + ScrapeManager *retrieval.ScrapeManager RuleManager *rules.Manager Notifier *notifier.Notifier Version *PrometheusVersion @@ -169,7 +169,7 @@ func New(logger log.Logger, o *Options) *Handler { flagsMap: o.Flags, context: o.Context, - targetManager: o.TargetManager, + scrapeManager: o.ScrapeManager, ruleManager: o.RuleManager, queryEngine: o.QueryEngine, tsdb: o.TSDB, @@ -181,7 +181,7 @@ func New(logger log.Logger, o *Options) *Handler { ready: 0, } - h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.targetManager, h.notifier, + h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier, func() config.Config { h.mtx.RLock() defer h.mtx.RUnlock() @@ -405,7 +405,7 @@ func (h *Handler) Run(ctx context.Context) error { h.options.QueryEngine, h.options.Storage.Querier, func() []*retrieval.Target { - return h.options.TargetManager.Targets() + return h.options.ScrapeManager.Targets() }, func() []*url.URL { return h.options.Notifier.Alertmanagers() @@ -605,7 +605,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { // Bucket targets by job label tps := map[string][]*retrieval.Target{} - for _, t := range h.targetManager.Targets() { + for _, t := range h.scrapeManager.Targets() { job := t.Labels().Get(model.JobLabel) tps[job] = append(tps[job], t) } From 9c61f0e8a0c1d4f83d06e003af610a7d671287b2 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sun, 26 Nov 2017 15:15:15 +0000 Subject: [PATCH 02/13] scrape pool doesn't rely on context as Stop() needs to be blocking to prevent Scrape loops trying to write to a closed TSDB storage. --- cmd/prometheus/main.go | 80 ++++++++++++++++++++++++---------------- discovery/manager.go | 1 - retrieval/manager.go | 25 ++++++++----- retrieval/scrape.go | 14 +++---- retrieval/scrape_test.go | 4 +- 5 files changed, 73 insertions(+), 51 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 132605c365..c60f5150b9 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -231,16 +231,24 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) - discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) - ctxScrape, cancelScrape = context.WithCancel(context.Background()) - scrapeManager = retrieval.NewScrapeManager(ctxScrape, log.With(logger, "component", "scrape manager"), fanoutStorage) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ctxWeb, cancelWeb = context.WithCancel(context.Background()) - webHandler = web.New(log.With(logger, "component", "web"), &cfg.web) + ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) + ctxRule = context.Background() + + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage, + Notifier: notifier, + QueryEngine: queryEngine, + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Logger: log.With(logger, "component", "rule manager"), + }) ) +<<<<<<< HEAD ctx := context.Background() ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, @@ -253,6 +261,9 @@ func main() { }) cfg.web.Context = ctx +======= + cfg.web.Context = ctxWeb +>>>>>>> 95b1dec3... scrape pool doesn't rely on context as Stop() needs to be blocking to prevent Scrape loops trying to write to a closed TSDB storage. cfg.web.TSDB = localStorage.Get cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine @@ -274,6 +285,9 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } + // Depend on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager + webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) + // Monitor outgoing connections on default transport with conntrack. http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc( conntrack.DialWithTracing(), @@ -310,17 +324,6 @@ func main() { var g group.Group { - g.Add( - func() error { - err := discoveryManager.Run() - level.Info(logger).Log("msg", "Discovery manager stopped") - return err - }, - func(err error) { - level.Info(logger).Log("msg", "Stopping discovery manager...") - cancelDiscovery() - }, - ) term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) cancel := make(chan struct{}) @@ -341,6 +344,34 @@ func main() { }, ) } + { + g.Add( + func() error { + err := discoveryManager.Run() + level.Info(logger).Log("msg", "Discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping discovery manager...") + cancelDiscovery() + }, + ) + } + { + g.Add( + func() error { + err := scrapeManager.Run(discoveryManager.SyncCh()) + level.Info(logger).Log("msg", "Scrape manager stopped") + return err + }, + func(err error) { + // Scrape manager needs to be stopped before closing the local TSDB + // so that it doesn't try to write samples to a closed storage. + level.Info(logger).Log("msg", "Stopping scrape manager...") + scrapeManager.Stop() + }, + ) + } { // Make sure that sighup handler is registered with a redirect to the channel before the potentially // long and synchronous tsdb init. @@ -482,19 +513,6 @@ func main() { }, ) } - { - g.Add( - func() error { - err := scrapeManager.Run(discoveryManager.SyncCh()) - level.Info(logger).Log("msg", "Scrape manager stopped") - return err - }, - func(err error) { - level.Info(logger).Log("msg", "Stopping scrape manager...") - cancelScrape() - }, - ) - } if err := g.Run(); err != nil { level.Error(logger).Log("err", err) } diff --git a/discovery/manager.go b/discovery/manager.go index 7119c54705..070c5b9003 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -87,7 +87,6 @@ func (m *DiscoveryManager) Run() error { return m.ctx.Err() } } - } // SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates diff --git a/retrieval/manager.go b/retrieval/manager.go index 694d14b97b..dec7f8d9b3 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -14,7 +14,6 @@ package retrieval import ( - "context" "fmt" "github.com/go-kit/kit/log" @@ -30,27 +29,27 @@ type Appendable interface { } // NewScrapeManager is the ScrapeManager constructor -func NewScrapeManager(ctx context.Context, logger log.Logger, app Appendable) *ScrapeManager { +func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { return &ScrapeManager{ - ctx: ctx, append: app, logger: logger, actionCh: make(chan func()), scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), + graceShut: make(chan struct{}), } } // ScrapeManager maintains a set of scrape pools and manages start/stop cicles // when receiving new target groups form the discovery manager. type ScrapeManager struct { - ctx context.Context logger log.Logger append Appendable scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool actionCh chan func() + graceShut chan struct{} } // Run starts background processing to handle target updates and reload the scraping loops. @@ -63,12 +62,20 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error f() case ts := <-tsets: m.reload(ts) - case <-m.ctx.Done(): - return m.ctx.Err() + case <-m.graceShut: + return nil } } } +// Stop cancels all running scrape pools and blocks until all have exited. +func (m *ScrapeManager) Stop() { + for _, sp := range m.scrapePools { + sp.stop() + } + close(m.graceShut) +} + // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { done := make(chan struct{}) @@ -123,10 +130,10 @@ func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) } - // scrape pool doesn't exist so start a new one + // Scrape pool doesn't exist so start a new one. existing, ok := m.scrapePools[tsetName] if !ok { - sp := newScrapePool(m.ctx, scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) m.scrapePools[tsetName] = sp sp.Sync(tgroup) @@ -134,7 +141,7 @@ func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { existing.Sync(tgroup) } - // cleanup - check config and cancel the scrape loops if it don't exist in the scrape config + // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. jobs := make(map[string]struct{}) for k := range m.scrapeConfigs { diff --git a/retrieval/scrape.go b/retrieval/scrape.go index ed1aac14db..62caadadef 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -117,7 +117,6 @@ func init() { type scrapePool struct { appendable Appendable logger log.Logger - ctx context.Context mtx sync.RWMutex config *config.ScrapeConfig @@ -136,7 +135,7 @@ const maxAheadTime = 10 * time.Minute type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { +func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { if logger == nil { logger = log.NewNopLogger() } @@ -152,14 +151,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable sp := &scrapePool{ appendable: app, config: cfg, - ctx: ctx, client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, logger: logger, } sp.newLoop = func(t *Target, s scraper) loop { - return newScrapeLoop(sp.ctx, s, + return newScrapeLoop( + context.Background(), + s, log.With(logger, "target", t), buffers, func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, @@ -189,7 +189,6 @@ func (sp *scrapePool) stop() { delete(sp.loops, fp) delete(sp.targets, fp) } - wg.Wait() } @@ -582,8 +581,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { } } -func newScrapeLoop( - ctx context.Context, +func newScrapeLoop(ctx context.Context, sc scraper, l log.Logger, buffers *pool.BytesPool, @@ -605,8 +603,8 @@ func newScrapeLoop( sampleMutator: sampleMutator, reportSampleMutator: reportSampleMutator, stopped: make(chan struct{}), - ctx: ctx, l: l, + ctx: ctx, } sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index e43ed80a7c..cc2cfc121e 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp = newScrapePool(context.Background(), cfg, app, nil) + sp = newScrapePool(cfg, app, nil) ) if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { @@ -231,7 +231,7 @@ func TestScrapePoolReload(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp := newScrapePool(context.Background(), cfg, app, nil) + sp := newScrapePool(cfg, app, nil) wrapped := sp.appender() From c5cb0d2910692cfd77b36d7d270677f58f55d3f9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sun, 26 Nov 2017 22:18:05 +0000 Subject: [PATCH 03/13] simplify naming and API. --- cmd/prometheus/main.go | 2 +- discovery/manager.go | 102 ++++++++++++++++++++--------------------- 2 files changed, 50 insertions(+), 54 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c60f5150b9..b8ff348c74 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -236,7 +236,7 @@ func main() { ctxRule = context.Background() notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage, diff --git a/discovery/manager.go b/discovery/manager.go index 070c5b9003..ea24dc9e77 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -36,15 +36,15 @@ import ( "github.com/prometheus/prometheus/discovery/zookeeper" ) -// DiscoveryProvider provides information about target groups. It maintains a set +// Discoverer provides information about target groups. It maintains a set // of sources from which TargetGroups can originate. Whenever a discovery provider -// detects a potential change, it sends the TargetGroup through its provided channel. +// detects a potential change, it sends the TargetGroup through its channel. // -// The DiscoveryProvider does not have to guarantee that an actual change happened. +// Discoverer does not know if an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // -// DiscoveryProviders should initially send a full set of all discoverable TargetGroups. -type DiscoveryProvider interface { +// Discoverers should initially send a full set of all discoverable TargetGroups. +type Discoverer interface { // Run hands a channel to the discovery provider(consul,dns etc) through which it can send // updated target groups. // Must returns if the context gets canceled. It should not close the update @@ -52,33 +52,35 @@ type DiscoveryProvider interface { Run(ctx context.Context, up chan<- []*config.TargetGroup) } -type targetSetProvider struct { - cancel func() - tgroups []*config.TargetGroup -} +// type pool struct { +// cancel func() +// tgroups []*config.TargetGroup +// } -// NewDiscoveryManager is the DiscoveryManager constructor -func NewDiscoveryManager(ctx context.Context, logger log.Logger) *DiscoveryManager { - return &DiscoveryManager{ - ctx: ctx, - logger: logger, - actionCh: make(chan func()), - syncCh: make(chan map[string][]*config.TargetGroup), - targetSetProviders: make(map[string]map[string]*targetSetProvider), +// NewManager is the Discovery Manager constructor +func NewManager(ctx context.Context, logger log.Logger) *Manager { + return &Manager{ + ctx: ctx, + logger: logger, + actionCh: make(chan func()), + syncCh: make(chan map[string][]*config.TargetGroup), + endpoints: make(map[string]map[string][]*config.TargetGroup), + discoverCancel: []context.CancelFunc{}, } } -// DiscoveryManager maintains a set of discovery providers and sends each update to a channel used by other packages. -type DiscoveryManager struct { - ctx context.Context - logger log.Logger - syncCh chan map[string][]*config.TargetGroup // map[targetSetName] - actionCh chan func() - targetSetProviders map[string]map[string]*targetSetProvider // map[targetSetName]map[providerName] +// Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +type Manager struct { + ctx context.Context + logger log.Logger + syncCh chan map[string][]*config.TargetGroup // map[targetSetName] + actionCh chan func() + discoverCancel []context.CancelFunc + endpoints map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] } // Run starts the background processing -func (m *DiscoveryManager) Run() error { +func (m *Manager) Run() error { for { select { case f := <-m.actionCh: @@ -90,27 +92,27 @@ func (m *DiscoveryManager) Run() error { } // SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates -func (m *DiscoveryManager) SyncCh() <-chan map[string][]*config.TargetGroup { +func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { return m.syncCh } // ApplyConfig removes all running discovery providers and starts new ones using the provided config. -func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { +func (m *Manager) ApplyConfig(cfg *config.Config) error { err := make(chan error) m.actionCh <- func() { - m.cancelDiscoveryProviders() + m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { ctx, cancel := context.WithCancel(m.ctx) updates := make(chan []*config.TargetGroup) - m.createProvider(cancel, scfg.JobName, provName) + m.discoverCancel = append(m.discoverCancel, cancel) go prov.Run(ctx, updates) go func(provName string) { select { case <-ctx.Done(): - // First set of all targets the provider knows. + // First set of all endpoints the provider knows. case tgs, ok := <-updates: // Handle the case that a target provider exits and closes the channel // before the context is done. @@ -146,35 +148,29 @@ func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { return <-err } -func (m *DiscoveryManager) cancelDiscoveryProviders() { - for targetSetName, targetSetProviders := range m.targetSetProviders { - for _, discoveryProvider := range targetSetProviders { - discoveryProvider.cancel() - } - delete(m.targetSetProviders, targetSetName) +func (m *Manager) cancelDiscoverers() { + for _, c := range m.discoverCancel { + c() } + m.endpoints = make(map[string]map[string][]*config.TargetGroup) + m.discoverCancel = []context.CancelFunc{} } -func (m *DiscoveryManager) createProvider(cancel context.CancelFunc, tsName, provName string) { - if m.targetSetProviders[tsName] == nil { - m.targetSetProviders[tsName] = make(map[string]*targetSetProvider) +// mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set +func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { + if m.endpoints[tsName] == nil { + m.endpoints[tsName] = make(map[string][]*config.TargetGroup) } - m.targetSetProviders[tsName][provName] = &targetSetProvider{ - cancel: cancel, - tgroups: []*config.TargetGroup{}, - } -} + m.endpoints[tsName][provName] = []*config.TargetGroup{} -// mergeGroups adds a new target group for a named discovery provider and returns all target groups for a given target set -func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { tset := make(chan map[string][]*config.TargetGroup) m.actionCh <- func() { if tg != nil { - m.targetSetProviders[tsName][provName].tgroups = tg + m.endpoints[tsName][provName] = tg } var tgAll []*config.TargetGroup - for _, prov := range m.targetSetProviders[tsName] { - for _, tg := range prov.tgroups { + for _, prov := range m.endpoints[tsName] { + for _, tg := range prov { tgAll = append(tgAll, tg) } } @@ -185,10 +181,10 @@ func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.Tar return <-tset } -func (m *DiscoveryManager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]DiscoveryProvider { - providers := map[string]DiscoveryProvider{} +func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer { + providers := map[string]Discoverer{} - app := func(mech string, i int, tp DiscoveryProvider) { + app := func(mech string, i int, tp Discoverer) { providers[fmt.Sprintf("%s/%d", mech, i)] = tp } @@ -280,7 +276,7 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { return &StaticProvider{groups} } -// Run implements the DiscoveryProvider interface. +// Run implements the Worker interface. func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // We still have to consider that the consumer exits right away in which case // the context will be canceled. From f5c2c5ff8fc8e21f9bfaa23d0f81c8245ba8632e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 27 Nov 2017 01:59:34 +0000 Subject: [PATCH 04/13] brake the start provider func so that can run unit tests against it. --- discovery/manager.go | 78 +++++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index ea24dc9e77..77c9598d37 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -103,43 +103,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - ctx, cancel := context.WithCancel(m.ctx) - updates := make(chan []*config.TargetGroup) - - m.discoverCancel = append(m.discoverCancel, cancel) - - go prov.Run(ctx, updates) - go func(provName string) { - select { - case <-ctx.Done(): - // First set of all endpoints the provider knows. - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) - } - } - }(provName) + m.startProvider(scfg.JobName, provName, prov) } } close(err) @@ -148,6 +112,46 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return <-err } +func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { + ctx, cancel := context.WithCancel(m.ctx) + updates := make(chan []*config.TargetGroup) + + m.discoverCancel = append(m.discoverCancel, cancel) + + go worker.Run(ctx, updates) + go func(provName string) { + select { + case <-ctx.Done(): + // First set of all endpoints the provider knows. + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + break + } + m.syncCh <- m.mergeGroups(jobName, provName, tgs) + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + m.syncCh <- m.mergeGroups(jobName, provName, tgs) + } + } + }(provName) +} + func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() From fe6c544532360a09c3cbdc6d235d6798b366263d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 29 Nov 2017 22:38:52 +0000 Subject: [PATCH 05/13] some renaming and comments fixes. remove some select state that is most likely obsoleete and hoepfully doesn't braje anything :) merge targets will sort by Discoverer name so we can have consistent tests for the maps. --- discovery/manager.go | 60 +++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 77c9598d37..fa8cb183ec 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,7 +16,7 @@ package discovery import ( "context" "fmt" - "time" + "sort" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -64,7 +64,7 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager { logger: logger, actionCh: make(chan func()), syncCh: make(chan map[string][]*config.TargetGroup), - endpoints: make(map[string]map[string][]*config.TargetGroup), + targets: make(map[string]map[string][]*config.TargetGroup), discoverCancel: []context.CancelFunc{}, } } @@ -76,7 +76,7 @@ type Manager struct { syncCh chan map[string][]*config.TargetGroup // map[targetSetName] actionCh chan func() discoverCancel []context.CancelFunc - endpoints map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] + targets map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] } // Run starts the background processing @@ -86,12 +86,13 @@ func (m *Manager) Run() error { case f := <-m.actionCh: f() case <-m.ctx.Done(): + m.cancelDiscoverers() return m.ctx.Err() } } } -// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates +// SyncCh returns a read only channel used by all Discoverers to send target updates. func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { return m.syncCh } @@ -120,22 +121,6 @@ func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { go worker.Run(ctx, updates) go func(provName string) { - select { - case <-ctx.Done(): - // First set of all endpoints the provider knows. - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - m.syncCh <- m.mergeGroups(jobName, provName, tgs) - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - - // Start listening for further updates. for { select { case <-ctx.Done(): @@ -156,26 +141,37 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.endpoints = make(map[string]map[string][]*config.TargetGroup) + m.targets = make(map[string]map[string][]*config.TargetGroup) m.discoverCancel = []context.CancelFunc{} } // mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { - if m.endpoints[tsName] == nil { - m.endpoints[tsName] = make(map[string][]*config.TargetGroup) - } - m.endpoints[tsName][provName] = []*config.TargetGroup{} - tset := make(chan map[string][]*config.TargetGroup) + m.actionCh <- func() { - if tg != nil { - m.endpoints[tsName][provName] = tg + if m.targets[tsName] == nil { + m.targets[tsName] = make(map[string][]*config.TargetGroup) } - var tgAll []*config.TargetGroup - for _, prov := range m.endpoints[tsName] { - for _, tg := range prov { - tgAll = append(tgAll, tg) + m.targets[tsName][provName] = []*config.TargetGroup{} + + if tg != nil { + m.targets[tsName][provName] = tg + } + tgAll := []*config.TargetGroup{} + + // Sort the providers alphabetically. + // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. + var providerNames []string + for providerName := range m.targets[tsName] { + providerNames = append(providerNames, providerName) + } + sort.Strings(providerNames) + for _, prov := range providerNames { + for _, tg := range m.targets[tsName][prov] { + if tg.Source != "" { // Don't add empty targets. + tgAll = append(tgAll, tg) + } } } t := make(map[string][]*config.TargetGroup) From aca8f85699211c2453f0121a4015975c605fc2dd Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 29 Nov 2017 22:52:38 +0000 Subject: [PATCH 06/13] fixed the tests --- discovery/manager_test.go | 1271 ++++++++++++++----------------------- web/web_test.go | 4 +- 2 files changed, 473 insertions(+), 802 deletions(-) diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 5c1fe6e653..c4ddac462a 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -15,9 +15,9 @@ package discovery import ( "context" + "fmt" "reflect" - "sync" - "sync/atomic" + "strconv" "testing" "time" @@ -26,20 +26,23 @@ import ( yaml "gopkg.in/yaml.v2" ) -func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { +// TestDiscoveryManagerSyncCalls checks that the target updates are received in the expected order. +func TestDiscoveryManagerSyncCalls(t *testing.T) { + + // The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter + // Final targets array is ordered alphabetically by the name of the discoverer. + // For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge. testCases := []struct { - title string - updates map[string][]update - expectedSyncCalls [][]string + title string + updates map[string][]update + expectedTargets [][]*config.TargetGroup }{ { title: "Single TP no updates", updates: map[string][]update{ "tp1": {}, }, - expectedSyncCalls: [][]string{ - {}, - }, + expectedTargets: nil, }, { title: "Multips TPs no updates", @@ -48,9 +51,7 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { "tp2": {}, "tp3": {}, }, - expectedSyncCalls: [][]string{ - {}, - }, + expectedTargets: nil, }, { title: "Single TP empty initials", @@ -62,7 +63,7 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { }, }, }, - expectedSyncCalls: [][]string{ + expectedTargets: [][]*config.TargetGroup{ {}, }, }, @@ -78,7 +79,7 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { "tp2": { { targetGroups: []config.TargetGroup{}, - interval: 500, + interval: 200, }, }, "tp3": { @@ -88,27 +89,9 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { }, }, }, - expectedSyncCalls: [][]string{ + expectedTargets: [][]*config.TargetGroup{ + {}, {}, - }, - }, - { - title: "Multiple TPs empty initials with a delay", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{}, - interval: 6000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{}, - interval: 6500, - }, - }, - }, - expectedSyncCalls: [][]string{ {}, }, }, @@ -117,13 +100,26 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 0, + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }}, }, }, }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, + expectedTargets: [][]*config.TargetGroup{ + {{ + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }}, }, }, { @@ -131,64 +127,67 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 0, + targetGroups: []config.TargetGroup{ + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, }, }, "tp2": { { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}}, - interval: 0, + targetGroups: []config.TargetGroup{{ + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }}, + interval: 10, }, }, }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp2-initial1"}, - }, - }, - { - title: "Single TP delayed initials only", - updates: map[string][]update{ - "tp1": { + expectedTargets: [][]*config.TargetGroup{ + { { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 6000, + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - {"initial1", "initial2"}, - }, - }, - { - title: "Multiple TPs with some delayed initials", - updates: map[string][]update{ - "tp1": { + }, { { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 100, + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, }, - }, - "tp2": { { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}, {Source: "tp2-initial3"}}, - interval: 6000, + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "3"}}, }, }, }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2"}, - {"tp1-initial1", "tp1-initial2", "tp2-initial1", "tp2-initial2", "tp2-initial3"}, - }, }, { title: "Single TP initials followed by empty updates", updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 0, + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 0, }, { targetGroups: []config.TargetGroup{}, @@ -196,8 +195,18 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { }, }, }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + {}, }, }, { @@ -205,18 +214,54 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 0, + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 0, }, { - targetGroups: []config.TargetGroup{{Source: "update1"}, {Source: "update2"}}, - interval: 10, + targetGroups: []config.TargetGroup{ + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 10, }, }, }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, - {"initial1", "initial2", "update1", "update2"}, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, }, }, { @@ -224,307 +269,348 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 10, + targetGroups: []config.TargetGroup{ + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 10, }, { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}, {Source: "tp1-update2"}}, - interval: 1500, + targetGroups: []config.TargetGroup{ + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 500, }, }, "tp2": { { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}}, - interval: 100, + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + interval: 100, }, { - targetGroups: []config.TargetGroup{{Source: "tp2-update1"}, {Source: "tp2-update2"}}, - interval: 10, + targetGroups: []config.TargetGroup{ + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + interval: 10, }, }, }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp2-initial1", "tp2-initial2"}, - {"tp1-initial1", "tp1-initial2", "tp2-initial1", "tp2-initial2", "tp1-update1", "tp1-update2", "tp2-update1", "tp2-update2"}, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, }, }, { - title: "One tp initials arrive after other tp updates but still within 5 seconds", + title: "One tp initials arrive after other tp updates.", updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 10, + targetGroups: []config.TargetGroup{ + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 10, }, { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}, {Source: "tp1-update2"}}, - interval: 1500, + targetGroups: []config.TargetGroup{ + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 150, }, }, "tp2": { { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}}, - interval: 2000, + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + interval: 200, }, { - targetGroups: []config.TargetGroup{{Source: "tp2-update1"}, {Source: "tp2-update2"}}, - interval: 1000, + targetGroups: []config.TargetGroup{ + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + interval: 100, }, }, }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2", "tp2-initial1", "tp2-initial2"}, - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2", "tp2-initial1", "tp2-initial2", "tp2-update1", "tp2-update2"}, - }, - }, - { - title: "One tp initials arrive after other tp updates and after 5 seconds", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}, {Source: "tp1-update2"}}, - interval: 1500, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update3"}}, - interval: 5000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}}, - interval: 6000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2"}, - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2", "tp2-initial1", "tp2-initial2", "tp1-update3"}, - }, - }, - { - title: "Single TP initials and new groups after a delay", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 6000, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}, {Source: "update2"}}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - {"initial1", "initial2", "update1", "update2"}, - }, - }, - { - title: "Single TP initial and successive updates", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{{Source: "update2"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update3"}}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1"}, - {"initial1", "update1", "update2", "update3"}, - }, - }, - { - title: "Multiple TPs initials and successive updates", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}}, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}}, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update2"}}, - interval: 2000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update3"}}, - interval: 2000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}}, - interval: 3000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update1"}}, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update2"}}, - interval: 3000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update3"}}, - interval: 2000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-update1", "tp2-initial1"}, - {"tp1-initial1", "tp1-update1", "tp2-initial1", "tp1-update2", "tp1-update3", "tp2-update1", "tp2-update2"}, - {"tp1-initial1", "tp1-update1", "tp2-initial1", "tp1-update2", "tp1-update3", "tp2-update1", "tp2-update2", "tp2-update3"}, - }, - }, - { - title: "Single TP Multiple updates 5 second window", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}}, - interval: 25, - }, - { - targetGroups: []config.TargetGroup{{Source: "update2"}, {Source: "update3"}, {Source: "update4"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update5"}}, - interval: 0, - }, - { - targetGroups: []config.TargetGroup{{Source: "update6"}, {Source: "update7"}, {Source: "update8"}}, - interval: 70, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1"}, - {"initial1", "update1", "update2", "update3", "update4", "update5", "update6", "update7", "update8"}, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, }, }, + { title: "Single TP Single provider empty update in between", updates: map[string][]update{ "tp1": { { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 30, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}, {Source: "update2"}}, - interval: 300, + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 30, }, { targetGroups: []config.TargetGroup{}, interval: 10, }, { - targetGroups: []config.TargetGroup{{Source: "update3"}, {Source: "update4"}, {Source: "update5"}, {Source: "update6"}}, - interval: 6000, + targetGroups: []config.TargetGroup{ + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 300, }, }, }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, - {"initial1", "initial2", "update1", "update2"}, - {"initial1", "initial2", "update1", "update2", "update3", "update4", "update5", "update6"}, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + {}, + { + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, }, }, } - for i, testCase := range testCases { - finalize := make(chan bool) - - syncCallCount := 0 - syncedGroups := make([][]string, 0) - - targetSet := NewTargetSet(&mockSyncer{ - sync: func(tgs []*config.TargetGroup) { - - currentCallGroup := make([]string, len(tgs)) - for i, tg := range tgs { - currentCallGroup[i] = tg.Source - } - syncedGroups = append(syncedGroups, currentCallGroup) - - syncCallCount++ - if syncCallCount == len(testCase.expectedSyncCalls) { - // All the groups are sent, we can start asserting. - close(finalize) - } - }, - }) - + for testIndex, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + discoveryManager := NewManager(ctx, nil) + go discoveryManager.Run() - targetProviders := map[string]TargetProvider{} - for tpName, tpUpdates := range testCase.updates { - tp := newMockTargetProvider(tpUpdates) - targetProviders[tpName] = tp + var totalUpdatesCount int + for tpName, update := range testCase.updates { + provider := newMockDiscoveryProvider(update) + discoveryManager.startProvider(strconv.Itoa(testIndex), tpName, provider) + + if len(update) > 0 { + totalUpdatesCount = totalUpdatesCount + len(update) + } } - go targetSet.Run(ctx) - targetSet.UpdateProviders(targetProviders) + Loop: + for x := 0; x < totalUpdatesCount; x++ { + select { + case <-time.After(10 * time.Second): + t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title) + break Loop + case tsetMap := <-discoveryManager.SyncCh(): + for _, received := range tsetMap { + if !reflect.DeepEqual(received, testCase.expectedTargets[x]) { + var receivedFormated string + for _, receivedTargets := range received { + receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) + } + var expectedFormated string + for _, receivedTargets := range testCase.expectedTargets[x] { + expectedFormated = expectedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) + } - select { - case <-time.After(20 * time.Second): - t.Errorf("%d. %q: Test timed out after 20 seconds. All targets should be sent within the timeout", i, testCase.title) - - case <-finalize: - for name, tp := range targetProviders { - runCallCount := tp.(mockTargetProvider).callCount() - if runCallCount != 1 { - t.Errorf("%d. %q: TargetProvider Run should be called once for each target provider. For %q was called %d times", i, testCase.title, name, runCallCount) - } - } - - if len(syncedGroups) != len(testCase.expectedSyncCalls) { - t.Errorf("%d. %q: received sync calls: \n %v \n do not match expected calls: \n %v \n", i, testCase.title, syncedGroups, testCase.expectedSyncCalls) - } - - for j := range syncedGroups { - if len(syncedGroups[j]) != len(testCase.expectedSyncCalls[j]) { - t.Errorf("%d. %q: received sync calls in call [%v]: \n %v \n do not match expected calls: \n %v \n", i, testCase.title, j, syncedGroups[j], testCase.expectedSyncCalls[j]) - } - - expectedGroupsMap := make(map[string]struct{}) - for _, expectedGroup := range testCase.expectedSyncCalls[j] { - expectedGroupsMap[expectedGroup] = struct{}{} - } - - for _, syncedGroup := range syncedGroups[j] { - if _, ok := expectedGroupsMap[syncedGroup]; !ok { - t.Errorf("%d. %q: '%s' does not exist in expected target groups: %s", i, testCase.title, syncedGroup, testCase.expectedSyncCalls[j]) - } else { - delete(expectedGroupsMap, syncedGroup) // Remove used targets from the map. + t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v", + x, testCase.title, + receivedFormated, + expectedFormated) } } } @@ -532,480 +618,75 @@ func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { } } -func TestTargetSetConsolidatesToTheLatestState(t *testing.T) { - testCases := []struct { - title string - updates map[string][]update - }{ - { - title: "Single TP update same initial group multiple times", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}, {"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 250, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 250, - }, - }, - }, - }, - { - title: "Multiple TPs update", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - }, - interval: 3, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.13:6003"}, - {"__instance__": "10.11.122.14:6003"}, - }, - }, - }, - interval: 10, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - }, - interval: 3, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.13:6003"}, - {"__instance__": "10.11.122.14:6003"}, - }, - }, - }, - interval: 10, - }, - }, - }, - }, - { - title: "Multiple TPs update II", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - { - Source: "tp1-initial2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.13:6003"}}, - }, - { - Source: "tp1-initial2", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.14:6003"}, - }, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - { - Source: "tp1-initial1", - Targets: []model.LabelSet{}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.16:6003"}, - {"__instance__": "10.11.122.17:6003"}, - {"__instance__": "10.11.122.18:6003"}, - }, - }, - }, - interval: 100, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.13:6003"}}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-update2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - { - Source: "tp2-update1", - Targets: []model.LabelSet{}, - }, - }, - interval: 300, - }, - }, - }, - }, - { - title: "Three rounds of sync call", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.11:6003"}, - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.13:6003"}, - }, - }, - { - Source: "tp1-initial2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.14:6003"}}, - }, - }, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - { - Source: "tp1-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - }, - interval: 3000, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{}, - }, - { - Source: "tp1-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.15:6003"}, - {"__instance__": "10.11.122.16:6003"}, - }, - }, - }, - interval: 3000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.11:6003"}, - }, - }, - { - Source: "tp2-initial2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.14:6003"}}, - }, - { - Source: "tp2-initial3", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - }, - interval: 6000, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}, {"__instance__": "10.11.122.12:6003"}}, - }, - { - Source: "tp2-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - }, - interval: 3000, - }, - }, - }, - }, - } - - // Function to determine if the sync call received the latest state of - // all the target groups that came out of the target provider. - endStateAchieved := func(groupsSentToSyc []*config.TargetGroup, endState map[string]config.TargetGroup) bool { - - if len(groupsSentToSyc) != len(endState) { - return false - } - - for _, tg := range groupsSentToSyc { - if _, ok := endState[tg.Source]; ok == false { - // The target group does not exist in the end state. - return false - } - - if reflect.DeepEqual(endState[tg.Source], *tg) == false { - // The target group has not reached its final state yet. - return false - } - - delete(endState, tg.Source) // Remove used target groups. - } - - return true - } - - for i, testCase := range testCases { - expectedGroups := make(map[string]config.TargetGroup) - for _, tpUpdates := range testCase.updates { - for _, update := range tpUpdates { - for _, targetGroup := range update.targetGroups { - expectedGroups[targetGroup.Source] = targetGroup - } - } - } - - finalize := make(chan bool) - - targetSet := NewTargetSet(&mockSyncer{ - sync: func(tgs []*config.TargetGroup) { - - endState := make(map[string]config.TargetGroup) - for k, v := range expectedGroups { - endState[k] = v - } - - if endStateAchieved(tgs, endState) == false { - return - } - - close(finalize) - }, - }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - targetProviders := map[string]TargetProvider{} - for tpName, tpUpdates := range testCase.updates { - tp := newMockTargetProvider(tpUpdates) - targetProviders[tpName] = tp - } - - go targetSet.Run(ctx) - targetSet.UpdateProviders(targetProviders) - - select { - case <-time.After(20 * time.Second): - t.Errorf("%d. %q: Test timed out after 20 seconds. All targets should be sent within the timeout", i, testCase.title) - - case <-finalize: - // System successfully reached to the end state. - } - } -} - func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) { - if _, ok := tgroups[name]; ok != present { + verifyPresence := func(tSets map[string]map[string][]*config.TargetGroup, tSetName string, provName, label string, present bool) { + if _, ok := tSets[tSetName]; !ok { + t.Fatalf("'%s' should be present in TargetSets: %v", tSetName, tSets) + return + } + if _, ok := tSets[tSetName][provName]; !ok { + t.Fatalf("'%s' should be present in Discovery providers: %v", provName, tSets[tSetName]) + return + } + + match := false + var mergedTargets string + for _, targetGroup := range tSets[tSetName][provName] { + + for _, l := range targetGroup.Targets { + mergedTargets = mergedTargets + " " + l.String() + if l.String() == label { + match = true + } + } + + } + if match != present { msg := "" if !present { - msg = "not " + msg = "not" } - t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups) + t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets) } } - cfg := &config.ServiceDiscoveryConfig{} + cfg := &config.Config{} sOne := ` -static_configs: -- targets: ["foo:9090"] -- targets: ["bar:9090"] +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ["foo:9090"] + - targets: ["bar:9090"] ` if err := yaml.Unmarshal([]byte(sOne), cfg); err != nil { t.Fatalf("Unable to load YAML config sOne: %s", err) } - called := make(chan struct{}) - - ts := NewTargetSet(&mockSyncer{ - sync: func([]*config.TargetGroup) { called <- struct{}{} }, - }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + discoveryManager := NewManager(ctx, nil) + go discoveryManager.Run() - go ts.Run(ctx) + discoveryManager.ApplyConfig(cfg) - ts.UpdateProviders(ProvidersFromConfig(*cfg, nil)) - <-called - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", true) + _ = <-discoveryManager.SyncCh() + verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", true) sTwo := ` -static_configs: -- targets: ["foo:9090"] +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ["foo:9090"] ` if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil { - t.Fatalf("Unable to load YAML config sTwo: %s", err) + t.Fatalf("Unable to load YAML config sOne: %s", err) } + discoveryManager.ApplyConfig(cfg) - ts.UpdateProviders(ProvidersFromConfig(*cfg, nil)) - <-called - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", false) -} - -func TestTargetSetRunsSameTargetProviderMultipleTimes(t *testing.T) { - var wg sync.WaitGroup - - wg.Add(2) - - ts1 := NewTargetSet(&mockSyncer{ - sync: func([]*config.TargetGroup) { wg.Done() }, - }) - - ts2 := NewTargetSet(&mockSyncer{ - sync: func([]*config.TargetGroup) { wg.Done() }, - }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - updates := []update{ - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 10, - }, - } - - tp := newMockTargetProvider(updates) - targetProviders := map[string]TargetProvider{} - targetProviders["testProvider"] = tp - - go ts1.Run(ctx) - go ts2.Run(ctx) - - ts1.UpdateProviders(targetProviders) - ts2.UpdateProviders(targetProviders) - - finalize := make(chan struct{}) - go func() { - defer close(finalize) - wg.Wait() - }() - - select { - case <-time.After(20 * time.Second): - t.Error("Test timed out after 20 seconds. All targets should be sent within the timeout") - - case <-finalize: - if tp.callCount() != 2 { - t.Errorf("Was expecting 2 calls, received %d", tp.callCount()) - } - } -} - -type mockSyncer struct { - sync func(tgs []*config.TargetGroup) -} - -func (s *mockSyncer) Sync(tgs []*config.TargetGroup) { - if s.sync != nil { - s.sync(tgs) - } + _ = <-discoveryManager.SyncCh() + verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", false) } type update struct { @@ -1013,30 +694,25 @@ type update struct { interval time.Duration } -type mockTargetProvider struct { - _callCount *int32 - updates []update - up chan<- []*config.TargetGroup +type mockdiscoveryProvider struct { + updates []update + up chan<- []*config.TargetGroup } -func newMockTargetProvider(updates []update) mockTargetProvider { - var callCount int32 +func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider { - tp := mockTargetProvider{ - _callCount: &callCount, - updates: updates, + tp := mockdiscoveryProvider{ + updates: updates, } - return tp } -func (tp mockTargetProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) { - atomic.AddInt32(tp._callCount, 1) +func (tp mockdiscoveryProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) { tp.up = up tp.sendUpdates() } -func (tp mockTargetProvider) sendUpdates() { +func (tp mockdiscoveryProvider) sendUpdates() { for _, update := range tp.updates { time.Sleep(update.interval * time.Millisecond) @@ -1045,11 +721,6 @@ func (tp mockTargetProvider) sendUpdates() { for i := range update.targetGroups { tgs[i] = &update.targetGroups[i] } - tp.up <- tgs } } - -func (tp mockTargetProvider) callCount() int { - return int(atomic.LoadInt32(tp._callCount)) -} diff --git a/web/web_test.go b/web/web_test.go index b17a0b1c20..7b02935437 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -96,7 +96,7 @@ func TestReadyAndHealthy(t *testing.T) { Context: nil, Storage: &tsdb.ReadyStorage{}, QueryEngine: nil, - TargetManager: nil, + ScrapeManager: nil, RuleManager: nil, Notifier: nil, RoutePrefix: "/", @@ -187,7 +187,7 @@ func TestRoutePrefix(t *testing.T) { Context: nil, Storage: &tsdb.ReadyStorage{}, QueryEngine: nil, - TargetManager: nil, + ScrapeManager: nil, RuleManager: nil, Notifier: nil, RoutePrefix: "/prometheus", From f2df712166ef4e16e69d8aace76202619483c0f8 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 29 Nov 2017 22:56:08 +0000 Subject: [PATCH 07/13] updated README --- discovery/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/discovery/README.md b/discovery/README.md index a2899a35db..a4bdde9f34 100644 --- a/discovery/README.md +++ b/discovery/README.md @@ -133,9 +133,9 @@ the Prometheus server will be able to see them. A Service Discovery (SD) mechanism has to discover targets and provide them to Prometheus. We expect similar targets to be grouped together, in the form of a [`TargetGroup`](https://godoc.org/github.com/prometheus/prometheus/config#TargetGroup). The SD mechanism sends the targets down to prometheus as list of `TargetGroups`. -An SD mechanism has to implement the `TargetProvider` Interface: +An SD mechanism has to implement the `Discoverer` Interface: ```go -type TargetProvider interface { +type Discoverer interface { Run(ctx context.Context, up chan<- []*config.TargetGroup) } ``` From b0d4f6ee08632e09d0b0cc8ec0b2a0cbf61b5861 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 29 Nov 2017 23:16:36 +0000 Subject: [PATCH 08/13] resolved merge confilc in main.go --- cmd/prometheus/main.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b8ff348c74..a3ede6197c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -239,31 +239,18 @@ func main() { discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage, - Notifier: notifier, - QueryEngine: queryEngine, - Context: ctxRule, + ruleManager := rules.NewManager(&rules.ManagerOptions{ + Appendable: fanoutStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine), + NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), + Context: ctx, ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, Logger: log.With(logger, "component", "rule manager"), }) ) -<<<<<<< HEAD - ctx := context.Background() - ruleManager := rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine), - NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), - Context: ctx, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - }) - - cfg.web.Context = ctx -======= cfg.web.Context = ctxWeb ->>>>>>> 95b1dec3... scrape pool doesn't rely on context as Stop() needs to be blocking to prevent Scrape loops trying to write to a closed TSDB storage. cfg.web.TSDB = localStorage.Get cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine @@ -295,7 +282,7 @@ func main() { reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, - targetManager.ApplyConfig, + discoveryManager.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, func(cfg *config.Config) error { From 6ff1d5c51e3cb0e0df58aa90a473317e25c6d4ac Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 29 Nov 2017 23:56:57 +0000 Subject: [PATCH 09/13] add the scrape manager config reloader handle errors with invalid scrape config --- cmd/prometheus/main.go | 1 + retrieval/manager.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a3ede6197c..f09f0ce425 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -285,6 +285,7 @@ func main() { discoveryManager.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, + scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. var files []string diff --git a/retrieval/manager.go b/retrieval/manager.go index dec7f8d9b3..4f1f9775c9 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -61,7 +61,9 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error case f := <-m.actionCh: f() case ts := <-tsets: - m.reload(ts) + if err := m.reload(ts); err != nil { + level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err) + } case <-m.graceShut: return nil } From 1ec76d1950862c1cd5263065616212a97670449d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 1 Dec 2017 12:59:24 +0000 Subject: [PATCH 10/13] rearange the contexts variables and logic split the groupsMerge function to set and get other small nits --- cmd/prometheus/main.go | 10 ++--- discovery/manager.go | 80 ++++++++++++++++++++++----------------- discovery/manager_test.go | 50 ++++++++++++++---------- retrieval/manager.go | 2 +- 4 files changed, 81 insertions(+), 61 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f09f0ce425..757ec5e856 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -231,12 +231,11 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - ctxWeb, cancelWeb = context.WithCancel(context.Background()) - ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) - ctxRule = context.Background() + ctxWeb, cancelWeb = context.WithCancel(context.Background()) + ctxRule = context.Background() notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager")) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ruleManager := rules.NewManager(&rules.ManagerOptions{ @@ -333,9 +332,10 @@ func main() { ) } { + ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background()) g.Add( func() error { - err := discoveryManager.Run() + err := discoveryManager.Run(ctxDiscovery) level.Info(logger).Log("msg", "Discovery manager stopped") return err }, diff --git a/discovery/manager.go b/discovery/manager.go index fa8cb183ec..3cb68d060b 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -58,11 +58,10 @@ type Discoverer interface { // } // NewManager is the Discovery Manager constructor -func NewManager(ctx context.Context, logger log.Logger) *Manager { +func NewManager(logger log.Logger) *Manager { return &Manager{ - ctx: ctx, logger: logger, - actionCh: make(chan func()), + actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*config.TargetGroup), targets: make(map[string]map[string][]*config.TargetGroup), discoverCancel: []context.CancelFunc{}, @@ -70,24 +69,25 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager { } // Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. +// Targets pool is kept in a map with a format map[targetSetName]map[providerName]. type Manager struct { - ctx context.Context logger log.Logger - syncCh chan map[string][]*config.TargetGroup // map[targetSetName] - actionCh chan func() + syncCh chan map[string][]*config.TargetGroup + actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] + targets map[string]map[string][]*config.TargetGroup } // Run starts the background processing -func (m *Manager) Run() error { +func (m *Manager) Run(ctx context.Context) error { for { select { case f := <-m.actionCh: - f() - case <-m.ctx.Done(): + f(ctx) + case <-ctx.Done(): m.cancelDiscoverers() - return m.ctx.Err() + return ctx.Err() } } } @@ -100,11 +100,11 @@ func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { // ApplyConfig removes all running discovery providers and starts new ones using the provided config. func (m *Manager) ApplyConfig(cfg *config.Config) error { err := make(chan error) - m.actionCh <- func() { + m.actionCh <- func(ctx context.Context) { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(scfg.JobName, provName, prov) + m.startProvider(ctx, scfg.JobName, provName, prov) } } close(err) @@ -113,28 +113,31 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return <-err } -func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { - ctx, cancel := context.WithCancel(m.ctx) +func (m *Manager) startProvider(ctx context.Context, jobName, provName string, worker Discoverer) { + ctx, cancel := context.WithCancel(ctx) updates := make(chan []*config.TargetGroup) m.discoverCancel = append(m.discoverCancel, cancel) go worker.Run(ctx, updates) - go func(provName string) { - for { - select { - case <-ctx.Done(): + go m.runProvider(ctx, provName, jobName, updates) +} + +func (m *Manager) runProvider(ctx context.Context, provName, jobName string, updates chan []*config.TargetGroup) { + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - m.syncCh <- m.mergeGroups(jobName, provName, tgs) } + m.addGroup(jobName, provName, tgs) + m.syncCh <- m.allGroups(jobName) } - }(provName) + } } func (m *Manager) cancelDiscoverers() { @@ -142,25 +145,33 @@ func (m *Manager) cancelDiscoverers() { c() } m.targets = make(map[string]map[string][]*config.TargetGroup) - m.discoverCancel = []context.CancelFunc{} + m.discoverCancel = nil } -// mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set -func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { - tset := make(chan map[string][]*config.TargetGroup) +func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) { + done := make(chan struct{}) - m.actionCh <- func() { + m.actionCh <- func(ctx context.Context) { if m.targets[tsName] == nil { m.targets[tsName] = make(map[string][]*config.TargetGroup) } - m.targets[tsName][provName] = []*config.TargetGroup{} if tg != nil { m.targets[tsName][provName] = tg } + close(done) + + } + <-done +} + +func (m *Manager) allGroups(tsName string) map[string][]*config.TargetGroup { + tset := make(chan map[string][]*config.TargetGroup) + + m.actionCh <- func(ctx context.Context) { tgAll := []*config.TargetGroup{} - // Sort the providers alphabetically. + // Sorting the providers is needed so that we can have predictable tests. // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. var providerNames []string for providerName := range m.targets[tsName] { @@ -179,6 +190,7 @@ func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) tset <- t } return <-tset + } func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer { diff --git a/discovery/manager_test.go b/discovery/manager_test.go index c4ddac462a..3e5b41f472 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -113,13 +113,16 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, }, expectedTargets: [][]*config.TargetGroup{ - {{ - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { - Source: "initial2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }}, + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, }, }, { @@ -131,7 +134,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { Source: "tp1-initial1", Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { + }, + { Source: "tp1-initial2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, @@ -140,10 +144,12 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, "tp2": { { - targetGroups: []config.TargetGroup{{ - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }}, + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, interval: 10, }, }, @@ -153,7 +159,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { Source: "tp1-initial1", Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { + }, + { Source: "tp1-initial2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, @@ -161,7 +168,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { Source: "tp1-initial1", Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { + }, + { Source: "tp1-initial2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, @@ -576,13 +584,13 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { for testIndex, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) - go discoveryManager.Run() + discoveryManager := NewManager(nil) + go discoveryManager.Run(ctx) var totalUpdatesCount int for tpName, update := range testCase.updates { provider := newMockDiscoveryProvider(update) - discoveryManager.startProvider(strconv.Itoa(testIndex), tpName, provider) + discoveryManager.startProvider(ctx, strconv.Itoa(testIndex), tpName, provider) if len(update) > 0 { totalUpdatesCount = totalUpdatesCount + len(update) @@ -603,8 +611,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) } var expectedFormated string - for _, receivedTargets := range testCase.expectedTargets[x] { - expectedFormated = expectedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) + for _, expectedTargets := range testCase.expectedTargets[x] { + expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets) } t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v", @@ -664,8 +672,8 @@ scrape_configs: } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) - go discoveryManager.Run() + discoveryManager := NewManager(nil) + go discoveryManager.Run(ctx) discoveryManager.ApplyConfig(cfg) diff --git a/retrieval/manager.go b/retrieval/manager.go index 4f1f9775c9..e1bfc0ff99 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -41,7 +41,7 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { } } -// ScrapeManager maintains a set of scrape pools and manages start/stop cicles +// ScrapeManager maintains a set of scrape pools and manages start/stop cycles // when receiving new target groups form the discovery manager. type ScrapeManager struct { logger log.Logger From 80182a5d82d0f29fe8ec483f2ec1ae6d10f9f1de Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sun, 3 Dec 2017 13:56:28 +0000 Subject: [PATCH 11/13] use poolKey as the pool map key to avoid multi dimensional maps --- discovery/manager.go | 77 ++++++++++++++++++++------------------- discovery/manager_test.go | 22 +++++------ 2 files changed, 48 insertions(+), 51 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 3cb68d060b..dbfc0bda48 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -52,10 +52,18 @@ type Discoverer interface { Run(ctx context.Context, up chan<- []*config.TargetGroup) } -// type pool struct { -// cancel func() -// tgroups []*config.TargetGroup -// } +type poolKey struct { + set string + provider string +} + +// byProvider implements sort.Interface for []poolKey based on the provider field. +// Sorting is needed so that we can have predictable tests. +type byProvider []poolKey + +func (a byProvider) Len() int { return len(a) } +func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider } // NewManager is the Discovery Manager constructor func NewManager(logger log.Logger) *Manager { @@ -63,20 +71,19 @@ func NewManager(logger log.Logger) *Manager { logger: logger, actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*config.TargetGroup), - targets: make(map[string]map[string][]*config.TargetGroup), + targets: make(map[poolKey][]*config.TargetGroup), discoverCancel: []context.CancelFunc{}, } } // Manager maintains a set of discovery providers and sends each update to a channel used by other packages. -// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. -// Targets pool is kept in a map with a format map[targetSetName]map[providerName]. type Manager struct { logger log.Logger - syncCh chan map[string][]*config.TargetGroup actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[string]map[string][]*config.TargetGroup + targets map[poolKey][]*config.TargetGroup + // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. + syncCh chan map[string][]*config.TargetGroup } // Run starts the background processing @@ -104,7 +111,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(ctx, scfg.JobName, provName, prov) + m.startProvider(ctx, poolKey{set: scfg.JobName, provider: provName}, prov) } } close(err) @@ -113,17 +120,17 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return <-err } -func (m *Manager) startProvider(ctx context.Context, jobName, provName string, worker Discoverer) { +func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { ctx, cancel := context.WithCancel(ctx) updates := make(chan []*config.TargetGroup) m.discoverCancel = append(m.discoverCancel, cancel) go worker.Run(ctx, updates) - go m.runProvider(ctx, provName, jobName, updates) + go m.runProvider(ctx, poolKey, updates) } -func (m *Manager) runProvider(ctx context.Context, provName, jobName string, updates chan []*config.TargetGroup) { +func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*config.TargetGroup) { for { select { case <-ctx.Done(): @@ -134,8 +141,8 @@ func (m *Manager) runProvider(ctx context.Context, provName, jobName string, upd if !ok { return } - m.addGroup(jobName, provName, tgs) - m.syncCh <- m.allGroups(jobName) + m.addGroup(poolKey, tgs) + m.syncCh <- m.allGroups(poolKey) } } } @@ -144,20 +151,16 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.targets = make(map[string]map[string][]*config.TargetGroup) + m.targets = make(map[poolKey][]*config.TargetGroup) m.discoverCancel = nil } -func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) { +func (m *Manager) addGroup(poolKey poolKey, tg []*config.TargetGroup) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { - if m.targets[tsName] == nil { - m.targets[tsName] = make(map[string][]*config.TargetGroup) - } - if tg != nil { - m.targets[tsName][provName] = tg + m.targets[poolKey] = tg } close(done) @@ -165,31 +168,29 @@ func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) { <-done } -func (m *Manager) allGroups(tsName string) map[string][]*config.TargetGroup { - tset := make(chan map[string][]*config.TargetGroup) +func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup { + tSets := make(chan map[string][]*config.TargetGroup) m.actionCh <- func(ctx context.Context) { - tgAll := []*config.TargetGroup{} - // Sorting the providers is needed so that we can have predictable tests. - // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. - var providerNames []string - for providerName := range m.targets[tsName] { - providerNames = append(providerNames, providerName) + // Sorting by the poolKey is needed so that we can have predictable tests. + var pKeys []poolKey + for pk := range m.targets { + pKeys = append(pKeys, pk) } - sort.Strings(providerNames) - for _, prov := range providerNames { - for _, tg := range m.targets[tsName][prov] { + sort.Sort(byProvider(pKeys)) + + tSetsAll := map[string][]*config.TargetGroup{} + for _, pk := range pKeys { + for _, tg := range m.targets[pk] { if tg.Source != "" { // Don't add empty targets. - tgAll = append(tgAll, tg) + tSetsAll[pk.set] = append(tSetsAll[pk.set], tg) } } } - t := make(map[string][]*config.TargetGroup) - t[tsName] = tgAll - tset <- t + tSets <- tSetsAll } - return <-tset + return <-tSets } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 3e5b41f472..845fc8a873 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -590,7 +590,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { var totalUpdatesCount int for tpName, update := range testCase.updates { provider := newMockDiscoveryProvider(update) - discoveryManager.startProvider(ctx, strconv.Itoa(testIndex), tpName, provider) + discoveryManager.startProvider(ctx, poolKey{set: strconv.Itoa(testIndex), provider: tpName}, provider) if len(update) > 0 { totalUpdatesCount = totalUpdatesCount + len(update) @@ -627,19 +627,15 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { } func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tSets map[string]map[string][]*config.TargetGroup, tSetName string, provName, label string, present bool) { - if _, ok := tSets[tSetName]; !ok { - t.Fatalf("'%s' should be present in TargetSets: %v", tSetName, tSets) - return - } - if _, ok := tSets[tSetName][provName]; !ok { - t.Fatalf("'%s' should be present in Discovery providers: %v", provName, tSets[tSetName]) + verifyPresence := func(tSets map[poolKey][]*config.TargetGroup, poolKey poolKey, label string, present bool) { + if _, ok := tSets[poolKey]; !ok { + t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) return } match := false var mergedTargets string - for _, targetGroup := range tSets[tSetName][provName] { + for _, targetGroup := range tSets[poolKey] { for _, l := range targetGroup.Targets { mergedTargets = mergedTargets + " " + l.String() @@ -678,8 +674,8 @@ scrape_configs: discoveryManager.ApplyConfig(cfg) _ = <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) sTwo := ` scrape_configs: @@ -693,8 +689,8 @@ scrape_configs: discoveryManager.ApplyConfig(cfg) _ = <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", false) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) } type update struct { From 60ef2016d508545f0e8b575a954899cae37a96e2 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sun, 3 Dec 2017 17:14:08 +0000 Subject: [PATCH 12/13] add a cancel func to the scrape pool as it is needed in the scrape loop select block --- retrieval/scrape.go | 6 +++++- retrieval/scrape_test.go | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 62caadadef..d07663e9e2 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -126,6 +126,7 @@ type scrapePool struct { targets map[uint64]*Target droppedTargets []*Target loops map[uint64]loop + cancel context.CancelFunc // Constructor for new scrape loops. This is settable for testing convenience. newLoop func(*Target, scraper) loop @@ -148,7 +149,9 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) buffers := pool.NewBytesPool(163, 100e6, 3) + ctx, cancel := context.WithCancel(context.Background()) sp := &scrapePool{ + cancel: cancel, appendable: app, config: cfg, client: client, @@ -158,7 +161,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) } sp.newLoop = func(t *Target, s scraper) loop { return newScrapeLoop( - context.Background(), + ctx, s, log.With(logger, "target", t), buffers, @@ -173,6 +176,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) // stop terminates all scrape loops and returns after they all terminated. func (sp *scrapePool) stop() { + sp.cancel() var wg sync.WaitGroup sp.mtx.Lock() diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index cc2cfc121e..b3a0466ab2 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -75,6 +75,7 @@ func TestScrapePoolStop(t *testing.T) { sp := &scrapePool{ targets: map[uint64]*Target{}, loops: map[uint64]loop{}, + cancel: func() {}, } var mtx sync.Mutex stopped := map[uint64]bool{} From 587dec9eb970531cddc7f1803d258e72129b5aa0 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 18 Dec 2017 19:41:31 +0000 Subject: [PATCH 13/13] rebased and resolved conflicts with the new Discovery GUI page Signed-off-by: Krasi Georgiev --- cmd/prometheus/main.go | 8 ++++---- discovery/manager.go | 6 +++--- discovery/manager_test.go | 10 +++++----- retrieval/manager.go | 25 +++++++++++++------------ web/web.go | 2 +- 5 files changed, 26 insertions(+), 25 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 757ec5e856..ad580e9979 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -238,11 +238,11 @@ func main() { discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager")) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ruleManager := rules.NewManager(&rules.ManagerOptions{ + ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, QueryFunc: rules.EngineQueryFunc(queryEngine), NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), - Context: ctx, + Context: ctxRule, ExternalURL: cfg.web.ExternalURL, Registerer: prometheus.DefaultRegisterer, Logger: log.With(logger, "component", "rule manager"), @@ -271,7 +271,7 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } - // Depend on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager + // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) // Monitor outgoing connections on default transport with conntrack. @@ -281,9 +281,9 @@ func main() { reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, - discoveryManager.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, + discoveryManager.ApplyConfig, scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. diff --git a/discovery/manager.go b/discovery/manager.go index dbfc0bda48..a76676a576 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -53,7 +53,7 @@ type Discoverer interface { } type poolKey struct { - set string + setName string provider string } @@ -111,7 +111,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(ctx, poolKey{set: scfg.JobName, provider: provName}, prov) + m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov) } } close(err) @@ -184,7 +184,7 @@ func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup { for _, pk := range pKeys { for _, tg := range m.targets[pk] { if tg.Source != "" { // Don't add empty targets. - tSetsAll[pk.set] = append(tSetsAll[pk.set], tg) + tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) } } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 845fc8a873..748d19f18e 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -590,7 +590,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { var totalUpdatesCount int for tpName, update := range testCase.updates { provider := newMockDiscoveryProvider(update) - discoveryManager.startProvider(ctx, poolKey{set: strconv.Itoa(testIndex), provider: tpName}, provider) + discoveryManager.startProvider(ctx, poolKey{setName: strconv.Itoa(testIndex), provider: tpName}, provider) if len(update) > 0 { totalUpdatesCount = totalUpdatesCount + len(update) @@ -674,8 +674,8 @@ scrape_configs: discoveryManager.ApplyConfig(cfg) _ = <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) sTwo := ` scrape_configs: @@ -689,8 +689,8 @@ scrape_configs: discoveryManager.ApplyConfig(cfg) _ = <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) } type update struct { diff --git a/retrieval/manager.go b/retrieval/manager.go index e1bfc0ff99..7d9de9445a 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -92,20 +92,21 @@ func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { } // TargetMap returns map of active and dropped targets and their corresponding scrape config job name. -func (tm *TargetManager) TargetMap() map[string][]*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targetsMap := make(map[string][]*Target) - for jobName, ps := range tm.targetSets { - ps.sp.mtx.RLock() - for _, t := range ps.sp.targets { - targetsMap[jobName] = append(targetsMap[jobName], t) +func (m *ScrapeManager) TargetMap() map[string][]*Target { + targetsMap := make(chan map[string][]*Target) + m.actionCh <- func() { + targets := make(map[string][]*Target) + for jobName, sp := range m.scrapePools { + sp.mtx.RLock() + for _, t := range sp.targets { + targets[jobName] = append(targets[jobName], t) + } + targets[jobName] = append(targets[jobName], sp.droppedTargets...) + sp.mtx.RUnlock() } - targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...) - ps.sp.mtx.RUnlock() + targetsMap <- targets } - return targetsMap + return <-targetsMap } // Targets returns the targets currently being scraped. diff --git a/web/web.go b/web/web.go index 6dfe67a399..74752f72ef 100644 --- a/web/web.go +++ b/web/web.go @@ -587,7 +587,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) { func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { var index []string - targets := h.targetManager.TargetMap() + targets := h.scrapeManager.TargetMap() for job := range targets { index = append(index, job) }