diff --git a/discovery/manager.go b/discovery/manager.go index 6e9bab1d7c..24950d9d59 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -57,6 +57,8 @@ func (p *Provider) Discoverer() Discoverer { // IsStarted return true if Discoverer is started. func (p *Provider) IsStarted() bool { + p.mu.RLock() + defer p.mu.RUnlock() return p.cancel != nil } @@ -216,15 +218,22 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { newProviders []*Provider ) for _, prov := range m.providers { - // Cancel obsolete providers. - if len(prov.newSubs) == 0 { + // Cancel obsolete providers if it has no new subs and it has a cancel function. + // prov.cancel != nil is the same check as we use in IsStarted() method but we don't call IsStarted + // here because it would take a lock and we need the same lock ourselves for other reads. + prov.mu.RLock() + if len(prov.newSubs) == 0 && prov.cancel != nil { wg.Add(1) prov.done = func() { wg.Done() } + prov.cancel() + prov.mu.RUnlock() continue } + prov.mu.RUnlock() + newProviders = append(newProviders, prov) // refTargets keeps reference targets used to populate new subs' targets as they should be the same. var refTargets map[string]*targetgroup.Group @@ -298,7 +307,9 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) { ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) + p.mu.Lock() p.cancel = cancel + p.mu.Unlock() go p.d.Run(ctx, updates) go m.updater(ctx, p, updates) @@ -306,16 +317,20 @@ func (m *Manager) startProvider(ctx context.Context, p *Provider) { // cleaner cleans resources associated with provider. func (m *Manager) cleaner(p *Provider) { - p.mu.RLock() + p.mu.Lock() + defer p.mu.Unlock() + m.targetsMtx.Lock() for s := range p.subs { delete(m.targets, poolKey{s, p.name}) } m.targetsMtx.Unlock() - p.mu.RUnlock() if p.done != nil { p.done() } + + // Provider was cleaned so mark is as down. + p.cancel = nil } func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) { @@ -380,9 +395,11 @@ func (m *Manager) cancelDiscoverers() { m.mtx.RLock() defer m.mtx.RUnlock() for _, p := range m.providers { + p.mu.RLock() if p.cancel != nil { p.cancel() } + p.mu.RUnlock() } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 1dd10baf47..38a93be9f4 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -1562,3 +1562,53 @@ func TestUnregisterMetrics(t *testing.T) { cancel() } } + +// Calling ApplyConfig() that removes providers at the same time as shutting down +// the manager should not hang. +func TestConfigReloadAndShutdownRace(t *testing.T) { + reg := prometheus.NewRegistry() + _, sdMetrics := NewTestMetrics(t, reg) + + mgrCtx, mgrCancel := context.WithCancel(context.Background()) + discoveryManager := NewManager(mgrCtx, promslog.NewNopLogger(), reg, sdMetrics) + require.NotNil(t, discoveryManager) + discoveryManager.updatert = 100 * time.Millisecond + + var wgDiscovery sync.WaitGroup + wgDiscovery.Add(1) + go func() { + discoveryManager.Run() + wgDiscovery.Done() + }() + time.Sleep(time.Millisecond * 200) + + var wgBg sync.WaitGroup + updateChan := discoveryManager.SyncCh() + wgBg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer wgBg.Done() + select { + case <-ctx.Done(): + return + case <-updateChan: + } + }() + + c := map[string]Configs{ + "prometheus": {staticConfig("bar:9090")}, + } + discoveryManager.ApplyConfig(c) + + delete(c, "prometheus") + wgBg.Add(1) + go func() { + discoveryManager.ApplyConfig(c) + wgBg.Done() + }() + mgrCancel() + wgDiscovery.Wait() + + cancel() + wgBg.Wait() +}