diff --git a/discovery/manager.go b/discovery/manager.go index 6e9bab1d7c..24950d9d59 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -57,6 +57,8 @@ func (p *Provider) Discoverer() Discoverer { // IsStarted return true if Discoverer is started. func (p *Provider) IsStarted() bool { + p.mu.RLock() + defer p.mu.RUnlock() return p.cancel != nil } @@ -216,15 +218,22 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { newProviders []*Provider ) for _, prov := range m.providers { - // Cancel obsolete providers. - if len(prov.newSubs) == 0 { + // Cancel obsolete providers if it has no new subs and it has a cancel function. + // prov.cancel != nil is the same check as we use in IsStarted() method but we don't call IsStarted + // here because it would take a lock and we need the same lock ourselves for other reads. + prov.mu.RLock() + if len(prov.newSubs) == 0 && prov.cancel != nil { wg.Add(1) prov.done = func() { wg.Done() } + prov.cancel() + prov.mu.RUnlock() continue } + prov.mu.RUnlock() + newProviders = append(newProviders, prov) // refTargets keeps reference targets used to populate new subs' targets as they should be the same. var refTargets map[string]*targetgroup.Group @@ -298,7 +307,9 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) { ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) + p.mu.Lock() p.cancel = cancel + p.mu.Unlock() go p.d.Run(ctx, updates) go m.updater(ctx, p, updates) @@ -306,16 +317,20 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) { // cleaner cleans resources associated with provider. func (m *Manager) cleaner(p *Provider) { - p.mu.RLock() + p.mu.Lock() + defer p.mu.Unlock() + m.targetsMtx.Lock() for s := range p.subs { delete(m.targets, poolKey{s, p.name}) } m.targetsMtx.Unlock() - p.mu.RUnlock() if p.done != nil { p.done() } + + // Provider was cleaned so mark is as down. + p.cancel = nil } func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) { @@ -380,9 +395,11 @@ func (m *Manager) cancelDiscoverers() { m.mtx.RLock() defer m.mtx.RUnlock() for _, p := range m.providers { + p.mu.RLock() if p.cancel != nil { p.cancel() } + p.mu.RUnlock() } }