Enhancement: Reload all scrape pools concurrently (#16595)

* Reload all scrape pools concurrently

At the moment all scrape pools that need to be reloaded are reloaded one by one. While reloads are ongoing mtxScrape is locked.
For each pool that's being reloaded we need to wait until all targets are updated.
This whole process can take a while and the more scrape pools to reload the longer.
At the same time all pools are independent and there's no real reason to do them one-by-one.
Reload each pool in a seperate goroutine so we finish config reload as ASAP as possible and unlock the mtxScrape.

Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>

* Address PR review feedback

Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>

---------

Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>
This commit is contained in:
Łukasz Mierzwa 2025-06-09 15:01:35 +01:00 committed by GitHub
parent 8fc1750bcc
commit c528293376
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -26,6 +26,7 @@ import (
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/common/promslog" "github.com/prometheus/common/promslog"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
@ -287,29 +288,46 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
} }
// Cleanup and reload pool if the configuration has changed. // Cleanup and reload pool if the configuration has changed.
var failed bool var (
for name, sp := range m.scrapePools { failed atomic.Bool
switch cfg, ok := m.scrapeConfigs[name]; { wg sync.WaitGroup
case !ok: toDelete sync.Map // Stores the list of names of pools to delete.
sp.stop() )
delete(m.scrapePools, name) for poolName, pool := range m.scrapePools {
case !reflect.DeepEqual(sp.config, cfg): wg.Add(1)
err := sp.reload(cfg) cfg, ok := m.scrapeConfigs[poolName]
if err != nil { // Reload each scrape pool in a dedicated goroutine so we don't have to wait a long time
m.logger.Error("error reloading scrape pool", "err", err, "scrape_pool", name) // if we have a lot of scrape pools to update.
failed = true go func(name string, sp *scrapePool, cfg *config.ScrapeConfig, ok bool) {
defer wg.Done()
switch {
case !ok:
sp.stop()
toDelete.Store(name, struct{}{})
case !reflect.DeepEqual(sp.config, cfg):
err := sp.reload(cfg)
if err != nil {
m.logger.Error("error reloading scrape pool", "err", err, "scrape_pool", name)
failed.Store(true)
}
fallthrough
case ok:
if l, ok := m.scrapeFailureLoggers[cfg.ScrapeFailureLogFile]; ok {
sp.SetScrapeFailureLogger(l)
} else {
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", name)
}
} }
fallthrough }(poolName, pool, cfg, ok)
case ok:
if l, ok := m.scrapeFailureLoggers[cfg.ScrapeFailureLogFile]; ok {
sp.SetScrapeFailureLogger(l)
} else {
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", name)
}
}
} }
wg.Wait()
if failed { toDelete.Range(func(name, _ any) bool {
delete(m.scrapePools, name.(string))
return true
})
if failed.Load() {
return errors.New("failed to apply the new configuration") return errors.New("failed to apply the new configuration")
} }
return nil return nil