mirror of
https://github.com/prometheus/prometheus.git
synced 2025-08-05 21:57:09 +02:00
Fix a race in discovery manager ApplyConfig & shutdown
If we call ApplyConfig() at the same time the manager is being stopped we might end up hanging forever. This is because ApplyConfig() will try to cancel obsolete providers and wait until they are cancelled. It's done by setting a done() function that call Done() on a sync.WaitGroup: ``` if len(prov.newSubs) == 0 { wg.Add(1) prov.done = func() { wg.Done() } } ``` then calling prov.cancel() and finally waiting until all providers run done() function that by blocking it all on a wg.Wait() call. For each provider there is a goroutine created by calling Manager.startProvider(*Provider): ``` func (m *Manager) startProvider(ctx context.Context, p *Provider) { m.logger.Debug("Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) p.mu.Lock() p.cancel = cancel p.mu.Unlock() go p.d.Run(ctx, updates) go m.updater(ctx, p, updates) } ``` It creates a context that can be cancelled and that cancel function becomes prov.cancel. This is what ApplyConfig will call. If we look at the body of updater() method: ``` func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) { // Ensure targets from this provider are cleaned up. defer m.cleaner(p) for { select { case <-ctx.Done(): return [...] ``` we can see that it will exit if that context is cancelled and that will trigger a call to Manager.cleaner(). That cleaner() is where done() is called. So ApplyConfig() -> calls cancel() -> causes cleaner() to be executed -> calls done(). cancel() is also called from cancelDiscoverers() method that will be called by Manager.Run() when Manager is stopping: ``` func (m *Manager) Run() error { go m.sender() <-m.ctx.Done() m.cancelDiscoverers() return m.ctx.Err() } ``` The problem is that if we call both ApplyConfig and stop the manager at the same time we might end up with: - We call Manager.ApplyConfig() - We stop the Manager - Manager.cancelDiscoverers() is called - Provider.cancel() is called for every Provider - cancel() causes provider context to be cancelled which terminates updater() for given Provider - cancelling context causes cleaner() method to be called for given Provider - cleaner() calls done() and exits - Provider is considered stopped at this point, there is no goroutine running that will call done() anymore - ApplyConfig iterates providers and decides that one is obsolete is must be stopped - It sets a custom done() function body with a WaitGroup.Done() call in it - Then ApplyConfig waits until all Providers run done() - But they are all stopped and no done() will be run - We wait forever This only happens if cancelDiscoverers() is run before ApplyConfig, if ApplyConfig runs first done() will be called, if cancelDiscoverers() is called first it will stop updater() instances and so done() won't be called anymore. Part of the problem is that there is no distinction between running and stopped providers. There is Provider.IsStarted() method that returns a bool based on the value of cancel function but ApplyConfig doesn't check it. Second problem is that although there is a mutex on a Provider it's used much in the code, so two goroutines can try to read and/or write provider.cancel and/or provider.done at the same time, making it all more likely to race. The easiest way to fix it is to check if the provider is started inside ApplyConfig so we don't try to stop a provider that's already stopped. For that we need to mark it as stopped after cancel() is called, by setting cancel to nil. This also needs better lock usage to avoid different parts of the code trying to set cancel and done at the same time. Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>
This commit is contained in:
parent
357e652044
commit
b49d143595
@ -57,6 +57,8 @@ func (p *Provider) Discoverer() Discoverer {
|
||||
|
||||
// IsStarted return true if Discoverer is started.
|
||||
func (p *Provider) IsStarted() bool {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.cancel != nil
|
||||
}
|
||||
|
||||
@ -216,15 +218,22 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
||||
newProviders []*Provider
|
||||
)
|
||||
for _, prov := range m.providers {
|
||||
// Cancel obsolete providers.
|
||||
if len(prov.newSubs) == 0 {
|
||||
// Cancel obsolete providers if it has no new subs and it has a cancel function.
|
||||
// prov.cancel != nil is the same check as we use in IsStarted() method but we don't call IsStarted
|
||||
// here because it would take a lock and we need the same lock ourselves for other reads.
|
||||
prov.mu.RLock()
|
||||
if len(prov.newSubs) == 0 && prov.cancel != nil {
|
||||
wg.Add(1)
|
||||
prov.done = func() {
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
prov.cancel()
|
||||
prov.mu.RUnlock()
|
||||
continue
|
||||
}
|
||||
prov.mu.RUnlock()
|
||||
|
||||
newProviders = append(newProviders, prov)
|
||||
// refTargets keeps reference targets used to populate new subs' targets as they should be the same.
|
||||
var refTargets map[string]*targetgroup.Group
|
||||
@ -298,7 +307,9 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
updates := make(chan []*targetgroup.Group)
|
||||
|
||||
p.mu.Lock()
|
||||
p.cancel = cancel
|
||||
p.mu.Unlock()
|
||||
|
||||
go p.d.Run(ctx, updates)
|
||||
go m.updater(ctx, p, updates)
|
||||
@ -306,16 +317,20 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) {
|
||||
|
||||
// cleaner cleans resources associated with provider.
|
||||
func (m *Manager) cleaner(p *Provider) {
|
||||
p.mu.RLock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
m.targetsMtx.Lock()
|
||||
for s := range p.subs {
|
||||
delete(m.targets, poolKey{s, p.name})
|
||||
}
|
||||
m.targetsMtx.Unlock()
|
||||
p.mu.RUnlock()
|
||||
if p.done != nil {
|
||||
p.done()
|
||||
}
|
||||
|
||||
// Provider was cleaned so mark is as down.
|
||||
p.cancel = nil
|
||||
}
|
||||
|
||||
func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
|
||||
@ -380,9 +395,11 @@ func (m *Manager) cancelDiscoverers() {
|
||||
m.mtx.RLock()
|
||||
defer m.mtx.RUnlock()
|
||||
for _, p := range m.providers {
|
||||
p.mu.RLock()
|
||||
if p.cancel != nil {
|
||||
p.cancel()
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user