diff --git a/notifier/notifier.go b/notifier/notifier.go index 4cf376aa05..a375a0749c 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -298,25 +298,14 @@ func (n *Manager) nextBatch() []*Alert { return alerts } -// Run dispatches notifications continuously. -func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { +// sendLoop continuously consumes the notifications queue and sends alerts to +// the configured Alertmanagers. +func (n *Manager) sendLoop() { for { - // The select is split in two parts, such as we will first try to read - // new alertmanager targets if they are available, before sending new - // alerts. select { case <-n.ctx.Done(): return - case ts := <-tsets: - n.reload(ts) - default: - select { - case <-n.ctx.Done(): - return - case ts := <-tsets: - n.reload(ts) - case <-n.more: - } + case <-n.more: } alerts := n.nextBatch() @@ -330,6 +319,21 @@ func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { } } +// Run receives updates of target groups and triggers a reload. +// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates. +// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. +func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { + go n.sendLoop() + for { + select { + case <-n.ctx.Done(): + return + case ts := <-tsets: + n.reload(ts) + } + } +} + func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { n.mtx.Lock() defer n.mtx.Unlock() @@ -483,6 +487,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { ams.mtx.RLock() + if len(ams.cfg.AlertRelabelConfigs) > 0 { amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) if len(amAlerts) == 0 { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 5c82decbe0..03290a58ca 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -701,125 +701,10 @@ func TestLabelsToOpenAPILabelSet(t *testing.T) { require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.FromStrings("aaa", "111", "bbb", "222"))) } -// TestHangingNotifier validates that targets updates happen even when there are -// queued alerts. -func TestHangingNotifier(t *testing.T) { - // Note: When targets are not updated in time, this test is flaky because go - // selects are not deterministic. Therefore we run 10 subtests to run into the issue. - for i := 0; i < 10; i++ { - t.Run(strconv.Itoa(i), func(t *testing.T) { - var ( - done = make(chan struct{}) - changed = make(chan struct{}) - syncCh = make(chan map[string][]*targetgroup.Group) - ) - - defer func() { - close(done) - }() - - var calledOnce bool - // Setting up a bad server. This server hangs for 2 seconds. - badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if calledOnce { - t.Fatal("hanging server called multiple times") - } - calledOnce = true - select { - case <-done: - case <-time.After(2 * time.Second): - } - })) - badURL, err := url.Parse(badServer.URL) - require.NoError(t, err) - badAddress := badURL.Host // Used for __name__ label in targets. - - // Setting up a bad server. This server returns fast, signaling requests on - // by closing the changed channel. - goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - close(changed) - })) - goodURL, err := url.Parse(goodServer.URL) - require.NoError(t, err) - goodAddress := goodURL.Host // Used for __name__ label in targets. - - h := NewManager( - &Options{ - QueueCapacity: 20 * maxBatchSize, - }, - nil, - ) - - h.alertmanagers = make(map[string]*alertmanagerSet) - - am1Cfg := config.DefaultAlertmanagerConfig - am1Cfg.Timeout = model.Duration(200 * time.Millisecond) - - h.alertmanagers["config-0"] = &alertmanagerSet{ - ams: []alertmanager{}, - cfg: &am1Cfg, - metrics: h.metrics, - } - go h.Run(syncCh) - defer h.Stop() - - var alerts []*Alert - for i := range make([]struct{}, 20*maxBatchSize) { - alerts = append(alerts, &Alert{ - Labels: labels.FromStrings("alertname", strconv.Itoa(i)), - }) - } - - // Injecting the hanging server URL. - syncCh <- map[string][]*targetgroup.Group{ - "config-0": { - { - Targets: []model.LabelSet{ - { - model.AddressLabel: model.LabelValue(badAddress), - }, - }, - }, - }, - } - - // Queing alerts. - h.Send(alerts...) - - // Updating with a working alertmanager target. - go func() { - select { - case syncCh <- map[string][]*targetgroup.Group{ - "config-0": { - { - Targets: []model.LabelSet{ - { - model.AddressLabel: model.LabelValue(goodAddress), - }, - }, - }, - }, - }: - case <-done: - } - }() - - select { - case <-time.After(1 * time.Second): - t.Fatalf("Timeout after 1 second, targets not synced in time.") - case <-changed: - // The good server has been hit in less than 3 seconds, therefore - // targets have been updated before a second call could be made to the - // bad server. - } - }) - } -} - -// TODO: renameit and even replace TestHangingNotifier with it. -// TestHangingNotifierXXX ensures that the notifier takes into account SD changes even when there are +// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are // queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. -func TestHangingNotifierXXX(t *testing.T) { +// and https://github.com/prometheus/prometheus/issues/8768. +func TestHangingNotifier(t *testing.T) { const ( batches = 100 alertsCount = maxBatchSize * batches @@ -857,6 +742,8 @@ func TestHangingNotifierXXX(t *testing.T) { require.NoError(t, err) // Initialize the discovery manager + // This is relevant as the updates aren't sent continually in real life, but only each updatert. + // The old implementation of TestHangingNotifier didn't take that into acount. ctx, cancel := context.WithCancel(context.Background()) defer cancel() reg := prometheus.NewRegistry()