Merge pull request #16820 from prymitive/discoveryRace

discovery: fix a race in ApplyConfig while Prometheus is being stopped
This commit is contained in:
Julien 2025-07-03 10:52:59 +02:00 committed by GitHub
commit 011c7fe87d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 71 additions and 4 deletions

View File

@ -57,6 +57,8 @@ func (p *Provider) Discoverer() Discoverer {
// IsStarted return true if Discoverer is started. // IsStarted return true if Discoverer is started.
func (p *Provider) IsStarted() bool { func (p *Provider) IsStarted() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.cancel != nil return p.cancel != nil
} }
@ -216,15 +218,22 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
newProviders []*Provider newProviders []*Provider
) )
for _, prov := range m.providers { for _, prov := range m.providers {
// Cancel obsolete providers. // Cancel obsolete providers if it has no new subs and it has a cancel function.
if len(prov.newSubs) == 0 { // prov.cancel != nil is the same check as we use in IsStarted() method but we don't call IsStarted
// here because it would take a lock and we need the same lock ourselves for other reads.
prov.mu.RLock()
if len(prov.newSubs) == 0 && prov.cancel != nil {
wg.Add(1) wg.Add(1)
prov.done = func() { prov.done = func() {
wg.Done() wg.Done()
} }
prov.cancel() prov.cancel()
prov.mu.RUnlock()
continue continue
} }
prov.mu.RUnlock()
newProviders = append(newProviders, prov) newProviders = append(newProviders, prov)
// refTargets keeps reference targets used to populate new subs' targets as they should be the same. // refTargets keeps reference targets used to populate new subs' targets as they should be the same.
var refTargets map[string]*targetgroup.Group var refTargets map[string]*targetgroup.Group
@ -298,7 +307,9 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group) updates := make(chan []*targetgroup.Group)
p.mu.Lock()
p.cancel = cancel p.cancel = cancel
p.mu.Unlock()
go p.d.Run(ctx, updates) go p.d.Run(ctx, updates)
go m.updater(ctx, p, updates) go m.updater(ctx, p, updates)
@ -306,16 +317,20 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) {
// cleaner cleans resources associated with provider. // cleaner cleans resources associated with provider.
func (m *Manager) cleaner(p *Provider) { func (m *Manager) cleaner(p *Provider) {
p.mu.RLock() p.mu.Lock()
defer p.mu.Unlock()
m.targetsMtx.Lock() m.targetsMtx.Lock()
for s := range p.subs { for s := range p.subs {
delete(m.targets, poolKey{s, p.name}) delete(m.targets, poolKey{s, p.name})
} }
m.targetsMtx.Unlock() m.targetsMtx.Unlock()
p.mu.RUnlock()
if p.done != nil { if p.done != nil {
p.done() p.done()
} }
// Provider was cleaned so mark is as down.
p.cancel = nil
} }
func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) { func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
@ -380,9 +395,11 @@ func (m *Manager) cancelDiscoverers() {
m.mtx.RLock() m.mtx.RLock()
defer m.mtx.RUnlock() defer m.mtx.RUnlock()
for _, p := range m.providers { for _, p := range m.providers {
p.mu.RLock()
if p.cancel != nil { if p.cancel != nil {
p.cancel() p.cancel()
} }
p.mu.RUnlock()
} }
} }

View File

@ -1562,3 +1562,53 @@ func TestUnregisterMetrics(t *testing.T) {
cancel() cancel()
} }
} }
// Calling ApplyConfig() that removes providers at the same time as shutting down
// the manager should not hang.
func TestConfigReloadAndShutdownRace(t *testing.T) {
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
mgrCtx, mgrCancel := context.WithCancel(context.Background())
discoveryManager := NewManager(mgrCtx, promslog.NewNopLogger(), reg, sdMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
var wgDiscovery sync.WaitGroup
wgDiscovery.Add(1)
go func() {
discoveryManager.Run()
wgDiscovery.Done()
}()
time.Sleep(time.Millisecond * 200)
var wgBg sync.WaitGroup
updateChan := discoveryManager.SyncCh()
wgBg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer wgBg.Done()
select {
case <-ctx.Done():
return
case <-updateChan:
}
}()
c := map[string]Configs{
"prometheus": {staticConfig("bar:9090")},
}
discoveryManager.ApplyConfig(c)
delete(c, "prometheus")
wgBg.Add(1)
go func() {
discoveryManager.ApplyConfig(c)
wgBg.Done()
}()
mgrCancel()
wgDiscovery.Wait()
cancel()
wgBg.Wait()
}