From 2d5d2398831ed68874a2bc5213abdc3fb2db718d Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Tue, 10 Jun 2025 11:58:07 +0200 Subject: [PATCH] feat(notifier): independent alertmanager queues Independent Alertmanager queues avoid issues with queue overflowing when one or more Alertmanager instances are unavailable which could result in lost alert notifications. The buffered queues are managed per AlertmanagerSet which are dynamically added/removed with service discovery or configuration reload. The following metrics now include an extra dimention for alertmanager label: - prometheus_notifications_dropped_total - prometheus_notifications_queue_capacity - prometheus_notifications_queue_length This change also includes the test from #14099 Closes #7676 Signed-off-by: Siavash Safi --- notifier/alert.go | 17 +- notifier/alertmanagerset.go | 174 +++++++++++++- notifier/alertmanagerset_test.go | 53 +++++ notifier/buffer.go | 124 ++++++++++ notifier/buffer_test.go | 130 +++++++++++ notifier/manager.go | 318 ++++---------------------- notifier/manager_test.go | 377 ++++++++++++++++++++++++------- notifier/metric.go | 22 +- 8 files changed, 845 insertions(+), 370 deletions(-) create mode 100644 notifier/alertmanagerset_test.go create mode 100644 notifier/buffer.go create mode 100644 notifier/buffer_test.go diff --git a/notifier/alert.go b/notifier/alert.go index 72f313c25e..bc03121ac2 100644 --- a/notifier/alert.go +++ b/notifier/alert.go @@ -68,11 +68,26 @@ func (a *Alert) ResolvedAt(ts time.Time) bool { return !a.EndsAt.After(ts) } +// Copy returns a copy of the alert. +func (a *Alert) Copy() *Alert { + return &Alert{ + Labels: a.Labels.Copy(), + Annotations: a.Annotations.Copy(), + StartsAt: a.StartsAt, + EndsAt: a.EndsAt, + GeneratorURL: a.GeneratorURL, + } +} + func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert { lb := labels.NewBuilder(labels.EmptyLabels()) var relabeledAlerts []*Alert - for _, a := range alerts { + for _, s := range alerts { + // Copy the alert to avoid race condition between multiple alertmanagersets + // holding references to the same alerts. + a := s.Copy() + lb.Reset(a.Labels) externalLabels.Range(func(l labels.Label) { if a.Labels.Get(l.Name) == "" { diff --git a/notifier/alertmanagerset.go b/notifier/alertmanagerset.go index bb269d4ed6..2425da91ec 100644 --- a/notifier/alertmanagerset.go +++ b/notifier/alertmanagerset.go @@ -14,11 +14,17 @@ package notifier import ( + "bytes" + "context" "crypto/md5" "encoding/hex" + "encoding/json" + "fmt" + "io" "log/slog" "net/http" "sync" + "time" config_util "github.com/prometheus/common/config" "github.com/prometheus/sigv4" @@ -26,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/labels" ) // alertmanagerSet contains a set of Alertmanagers discovered via a group of service @@ -33,16 +40,18 @@ import ( type alertmanagerSet struct { cfg *config.AlertmanagerConfig client *http.Client + opts *Options metrics *alertMetrics mtx sync.RWMutex ams []alertmanager droppedAms []alertmanager + buffers map[string]*buffer logger *slog.Logger } -func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { +func newAlertmanagerSet(cfg *config.AlertmanagerConfig, opts *Options, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") if err != nil { return nil, err @@ -61,6 +70,8 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, met s := &alertmanagerSet{ client: client, cfg: cfg, + opts: opts, + buffers: make(map[string]*buffer), logger: logger, metrics: metrics, } @@ -98,24 +109,31 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { continue } - // This will initialize the Counters for the AM to 0. - s.metrics.sent.WithLabelValues(us) + // This will initialize the Counters for the AM to 0 and set the static queue capacity gauge. + s.metrics.dropped.WithLabelValues(us) s.metrics.errors.WithLabelValues(us) + s.metrics.sent.WithLabelValues(us) + s.metrics.queueCapacity.WithLabelValues(us).Set(float64(s.opts.QueueCapacity)) seen[us] = struct{}{} s.ams = append(s.ams, am) } + s.startSendLoops(allAms) + // Now remove counters for any removed Alertmanagers. for _, am := range previousAms { us := am.url().String() if _, ok := seen[us]; ok { continue } - s.metrics.latency.DeleteLabelValues(us) - s.metrics.sent.DeleteLabelValues(us) + s.metrics.dropped.DeleteLabelValues(us) s.metrics.errors.DeleteLabelValues(us) + s.metrics.latency.DeleteLabelValues(us) + s.metrics.queueLength.DeleteLabelValues(us) + s.metrics.sent.DeleteLabelValues(us) seen[us] = struct{}{} } + s.cleanSendLoops(previousAms) } func (s *alertmanagerSet) configHash() (string, error) { @@ -126,3 +144,149 @@ func (s *alertmanagerSet) configHash() (string, error) { hash := md5.Sum(b) return hex.EncodeToString(hash[:]), nil } + +func (s *alertmanagerSet) send(alerts ...*Alert) map[string]int { + dropped := make(map[string]int) + + if len(s.cfg.AlertRelabelConfigs) > 0 { + alerts = relabelAlerts(s.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) + if len(alerts) == 0 { + return dropped + } + } + + for am, q := range s.buffers { + d := q.push(alerts...) + dropped[am] += d + } + + return dropped +} + +// startSendLoops create buffers for newly discovered alertmanager and +// starts a send loop for each. +// This function expects the caller to acquire needed locks. +func (s *alertmanagerSet) startSendLoops(all []alertmanager) { + for _, am := range all { + us := am.url().String() + // create new buffers and start send loops for new alertmanagers in the set. + if _, ok := s.buffers[us]; !ok { + s.buffers[us] = newBuffer(s.opts.QueueCapacity) + go s.sendLoop(am) + } + } +} + +// stopSendLoops stops the send loops for each removed alertmanager by +// closing and removing their respective buffers. +// This function expects the caller to acquire needed locks. +func (s *alertmanagerSet) cleanSendLoops(removed []alertmanager) { + for _, am := range removed { + us := am.url().String() + s.buffers[us].close() + delete(s.buffers, us) + } +} + +func (s *alertmanagerSet) sendLoop(am alertmanager) { + url := am.url().String() + + // allocate an alerts buffer for alerts with length and capacity equal to max batch size. + alerts := make([]*Alert, s.opts.MaxBatchSize) + for { + b := s.getBuffer(url) + if b == nil { + return + } + + _, ok := <-b.hasWork + if !ok { + return + } + + b.pop(&alerts) + + if !s.postNotifications(am, alerts) { + s.metrics.dropped.WithLabelValues(url).Add(float64(len(alerts))) + } + } +} + +func (s *alertmanagerSet) postNotifications(am alertmanager, alerts []*Alert) bool { + if len(alerts) == 0 { + return true + } + + begin := time.Now() + + var payload []byte + var err error + switch s.cfg.APIVersion { + case config.AlertmanagerAPIVersionV2: + { + openAPIAlerts := alertsToOpenAPIAlerts(alerts) + + payload, err = json.Marshal(openAPIAlerts) + if err != nil { + s.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err) + return false + } + } + + default: + { + s.logger.Error( + fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", s.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), + "err", err, + ) + return false + } + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.cfg.Timeout)) + defer cancel() + + url := am.url().String() + if err := s.sendOne(ctx, s.client, url, payload); err != nil { + s.logger.Error("Error sending alerts", "alertmanager", url, "count", len(alerts), "err", err) + s.metrics.errors.WithLabelValues(url).Add(float64(len(alerts))) + return false + } + s.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) + s.metrics.sent.WithLabelValues(url).Add(float64(len(alerts))) + + return true +} + +func (s *alertmanagerSet) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error { + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b)) + if err != nil { + return err + } + req.Header.Set("User-Agent", userAgent) + req.Header.Set("Content-Type", contentTypeJSON) + resp, err := s.opts.Do(ctx, c, req) + if err != nil { + return err + } + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %s", resp.Status) + } + + return nil +} + +func (s *alertmanagerSet) getBuffer(url string) *buffer { + s.mtx.RLock() + defer s.mtx.RUnlock() + if q, ok := s.buffers[url]; ok { + return q + } + return nil +} diff --git a/notifier/alertmanagerset_test.go b/notifier/alertmanagerset_test.go new file mode 100644 index 0000000000..3593e375b3 --- /dev/null +++ b/notifier/alertmanagerset_test.go @@ -0,0 +1,53 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "bytes" + "context" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCustomDo(t *testing.T) { + const testURL = "http://testurl.com/" + const testBody = "testbody" + + var received bool + h := alertmanagerSet{ + opts: &Options{ + Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + received = true + body, err := io.ReadAll(req.Body) + + require.NoError(t, err) + + require.Equal(t, testBody, string(body)) + + require.Equal(t, testURL, req.URL.String()) + + return &http.Response{ + Body: io.NopCloser(bytes.NewBuffer(nil)), + }, nil + }, + }, + } + + h.sendOne(context.Background(), nil, testURL, []byte(testBody)) + + require.True(t, received, "Expected to receive an alert, but didn't") +} diff --git a/notifier/buffer.go b/notifier/buffer.go new file mode 100644 index 0000000000..2d0aa64e6c --- /dev/null +++ b/notifier/buffer.go @@ -0,0 +1,124 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "sync" + + "go.uber.org/atomic" +) + +// buffer is a circular buffer for Alerts. +type buffer struct { + mtx sync.RWMutex + data []*Alert + size int + count int + readPointer int + writePointer int + hasWork chan struct{} + done chan struct{} + closed atomic.Bool +} + +func newBuffer(size int) *buffer { + return &buffer{ + data: make([]*Alert, size), + size: size, + hasWork: make(chan struct{}, 1), + done: make(chan struct{}, 1), + } +} + +func (b *buffer) push(alerts ...*Alert) (dropped int) { + b.mtx.Lock() + defer b.mtx.Unlock() + + for _, a := range alerts { + if b.count == b.size { + b.readPointer = (b.readPointer + 1) % b.size + dropped++ + } else { + b.count++ + } + b.data[b.writePointer] = a + b.writePointer = (b.writePointer + 1) % b.size + } + + // If the buffer still has items left, kick off the next iteration. + if b.count > 0 { + b.notifyWork() + } + + return +} + +// pop will move alerts from the buffer into the passed slice. +// Number of moved alerts = min (alerts in buffer and passed slice length). +// The silce length will be dynamically adjusted. +func (b *buffer) pop(alerts *[]*Alert) { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.count == 0 { + // Empty alerts from any cached data. + *alerts = (*alerts)[:0] + return + } + + count := min(b.count, cap(*alerts)) + *alerts = (*alerts)[0:count] + + for i := range count { + (*alerts)[i] = b.data[b.readPointer] + b.data[b.readPointer] = nil + b.readPointer = (b.readPointer + 1) % b.size + b.count-- + } + + // If the buffer still has items left, kick off the next iteration. + if b.count > 0 { + b.notifyWork() + } +} + +func (b *buffer) len() int { + b.mtx.RLock() + defer b.mtx.RUnlock() + return b.count +} + +func (b *buffer) notifyWork() { + if b.isClosed() { + return + } + + // Attempt to send a signal on the 'hasWork' channel if no signal is pending. + select { + case b.hasWork <- struct{}{}: + case <-b.done: + close(b.hasWork) + default: + // No action needed if the channel already has a pending signal. + } +} + +func (b *buffer) close() { + b.done <- struct{}{} + b.closed.Store(true) +} + +func (b *buffer) isClosed() bool { + return b.closed.Load() +} diff --git a/notifier/buffer_test.go b/notifier/buffer_test.go new file mode 100644 index 0000000000..5874b8c3dc --- /dev/null +++ b/notifier/buffer_test.go @@ -0,0 +1,130 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +func TestPushAlertsToBuffer(t *testing.T) { + alert1 := &Alert{Labels: labels.FromStrings("alertname", "existing1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "existing2")} + + // Initialize a buffer with capacity 5 and 2 existing alerts + b := newBuffer(5) + b.push(alert1, alert2) + require.Equal(t, []*Alert{alert1, alert2, nil, nil, nil}, b.data) + require.Equal(t, 2, b.len(), "Expected buffer length of 2") + + alert3 := &Alert{Labels: labels.FromStrings("alertname", "new1")} + alert4 := &Alert{Labels: labels.FromStrings("alertname", "new2")} + + // Push new alerts to buffer, expect 0 dropped + require.Zero(t, b.push(alert3, alert4), "Expected 0 dropped alerts") + + // Verify all new alerts were added to the buffer + require.Equal(t, []*Alert{alert1, alert2, alert3, alert4, nil}, b.data) + require.Equal(t, 4, b.len(), "Expected buffer length of 4") +} + +// Pushing alerts exceeding buffer capacity should drop oldest alerts. +func TestPushAlertsToBufferExceedingCapacity(t *testing.T) { + alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")} + + // Initialize a buffer with capacity 3 + b := newBuffer(3) + b.push(alert1, alert2) + + alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")} + alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")} + + // Push new alerts to buffer, expect 1 dropped + require.Equal(t, 1, b.push(alert3, alert4), "Expected 1 dropped alerts") + + // Verify all new alerts were added to the buffer, alert4 will be at the beginning of buffer, overwritten alert1 + require.Equal(t, []*Alert{alert4, alert2, alert3}, b.data, "Expected 3 alerts in the buffer") +} + +// Pushing alerts exceeding total buffer capacity should drop alerts from both old and new. +func TestPushAlertsToBufferExceedingTotalCapacity(t *testing.T) { + alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")} + + // Initialize a buffer with capacity 3 + b := newBuffer(3) + b.push(alert1, alert2) + + alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")} + alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")} + alert5 := &Alert{Labels: labels.FromStrings("alertname", "alert5")} + alert6 := &Alert{Labels: labels.FromStrings("alertname", "alert6")} + + // Push new alerts to buffer, expect 3 dropped: 1 from new batch + 2 from existing bufferd items + require.Equal(t, 3, b.push(alert3, alert4, alert5, alert6), "Expected 3 dropped alerts") + + // Verify all new alerts were added to the buffer + require.Equal(t, []*Alert{alert4, alert5, alert6}, b.data, "Expected 3 alerts in the buffer") +} + +func TestPopAlertsFromBuffer(t *testing.T) { + // Initialize a buffer with capacity 5 + b := newBuffer(5) + + alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")} + alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")} + b.push(alert1, alert2, alert3) + + // Test 3 alerts in the buffer + result1 := make([]*Alert, 3) + b.pop(&result1) + require.Equal(t, []*Alert{alert1, alert2, alert3}, result1, "Expected all 3 alerts") + require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements") + require.Zero(t, b.len(), "Expected buffer length of 0") + b.pop(&result1) + require.Empty(t, result1, "Expected pop to return empty slice") + + // Test full buffer + alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")} + alert5 := &Alert{Labels: labels.FromStrings("alertname", "alert5")} + b.push(alert1, alert2, alert3, alert4, alert5) + result2 := make([]*Alert, 5) + b.pop(&result2) + require.Equal(t, []*Alert{alert1, alert2, alert3, alert4, alert5}, result2, "Expected all 5 alerts") + require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements") + require.Zero(t, b.len(), "Expected buffer length of 0") + b.pop(&result2) + require.Empty(t, result2, "Expected pop to return empty slice") + + // Test smaller max size than capacity + b.push(alert1, alert2, alert3, alert4, alert5) + result3 := make([]*Alert, 3) + b.pop(&result3) + require.Equal(t, []*Alert{alert1, alert2, alert3}, result3, "Expected 3 first alerts from buffer") + require.Equal(t, 2, b.len(), "Expected buffer length of 2") + + // Pop the remaining 2 alerts in buffer + result4 := make([]*Alert, 3) + b.pop(&result4) + require.Equal(t, []*Alert{alert4, alert5}, result4, "Expected 2 last alerts from buffer") + require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements") + require.Zero(t, b.len(), "Expected buffer length of 0") + b.pop(&result4) + require.Empty(t, result4, "Expected pop to return empty slice") +} diff --git a/notifier/manager.go b/notifier/manager.go index a79757a066..3230ab3f58 100644 --- a/notifier/manager.go +++ b/notifier/manager.go @@ -14,11 +14,8 @@ package notifier import ( - "bytes" "context" - "encoding/json" "fmt" - "io" "log/slog" "net/http" "net/url" @@ -54,13 +51,11 @@ var userAgent = version.PrometheusUserAgent() // Manager is responsible for dispatching alert notifications to an // alert manager service. type Manager struct { - queue []*Alert - opts *Options + opts *Options metrics *alertMetrics - more chan struct{} - mtx sync.RWMutex + mtx sync.RWMutex stopOnce *sync.Once stopRequested chan struct{} @@ -105,23 +100,15 @@ func NewManager(o *Options, logger *slog.Logger) *Manager { } n := &Manager{ - queue: make([]*Alert, 0, o.QueueCapacity), - more: make(chan struct{}, 1), stopRequested: make(chan struct{}), stopOnce: &sync.Once{}, opts: o, logger: logger, } - queueLenFunc := func() float64 { return float64(n.queueLen()) } alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) } - n.metrics = newAlertMetrics( - o.Registerer, - o.QueueCapacity, - queueLenFunc, - alertmanagersDiscoveredFunc, - ) + n.metrics = newAlertMetrics(o.Registerer, alertmanagersDiscoveredFunc) return n } @@ -147,7 +134,7 @@ func (n *Manager) ApplyConfig(conf *config.Config) error { } for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() { - ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics) + ams, err := newAlertmanagerSet(cfg, n.opts, n.logger, n.metrics) if err != nil { return err } @@ -170,77 +157,15 @@ func (n *Manager) ApplyConfig(conf *config.Config) error { return nil } -func (n *Manager) queueLen() int { - n.mtx.RLock() - defer n.mtx.RUnlock() - - return len(n.queue) -} - -func (n *Manager) nextBatch() []*Alert { - n.mtx.Lock() - defer n.mtx.Unlock() - - var alerts []*Alert - - if maxBatchSize := n.opts.MaxBatchSize; len(n.queue) > maxBatchSize { - alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...) - n.queue = n.queue[maxBatchSize:] - } else { - alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...) - n.queue = n.queue[:0] - } - - return alerts -} - // Run dispatches notifications continuously, returning once Stop has been called and all // pending notifications have been drained from the queue (if draining is enabled). // // Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other. // Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - n.targetUpdateLoop(tsets) - }() - - go func() { - defer wg.Done() - n.sendLoop() - n.drainQueue() - }() - - wg.Wait() - n.logger.Info("Notification manager stopped") -} - -// sendLoop continuously consumes the notifications queue and sends alerts to -// the configured Alertmanagers. -func (n *Manager) sendLoop() { - for { - // If we've been asked to stop, that takes priority over sending any further notifications. - select { - case <-n.stopRequested: - return - default: - select { - case <-n.stopRequested: - return - - case <-n.more: - n.sendOneBatch() - - // If the queue still has items left, kick off the next iteration. - if n.queueLen() > 0 { - n.setMore() - } - } - } - } + n.targetUpdateLoop(tsets) + <-n.stopRequested + n.drainQueue() } // targetUpdateLoop receives updates of target groups and triggers a reload. @@ -261,31 +186,40 @@ func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) } } -func (n *Manager) sendOneBatch() { - alerts := n.nextBatch() - - if !n.sendAll(alerts...) { - n.metrics.dropped.Add(float64(len(alerts))) - } -} - func (n *Manager) drainQueue() { if !n.opts.DrainOnShutdown { - if n.queueLen() > 0 { - n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) - n.metrics.dropped.Add(float64(n.queueLen())) + for _, ams := range n.alertmanagers { + for am, b := range ams.buffers { + n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "alertmanager", am, "count", b.len()) + n.metrics.dropped.WithLabelValues(am).Add(float64(b.len())) + b.close() + } } - return } n.logger.Info("Draining any remaining notifications...") - for n.queueLen() > 0 { - n.sendOneBatch() + drained := false + for !drained { + remain := false + for _, ams := range n.alertmanagers { + for am, b := range ams.buffers { + if b.len() > 0 { + remain = true + n.logger.Info("Remaining notifications to drain", "alertmanager", am, "count", b.len()) + } + } + } + drained = !remain + time.Sleep(100 * time.Millisecond) + } + n.logger.Info("Remaining notifications drained, stopping send loops") + for _, ams := range n.alertmanagers { + for _, b := range ams.buffers { + b.close() + } } - - n.logger.Info("Remaining notifications drained") } func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { @@ -305,44 +239,23 @@ func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { // Send queues the given notification requests for processing. // Panics if called on a handler that is not running. func (n *Manager) Send(alerts ...*Alert) { - n.mtx.Lock() - defer n.mtx.Unlock() + n.mtx.RLock() + defer n.mtx.RUnlock() alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts) if len(alerts) == 0 { return } - // Queue capacity should be significantly larger than a single alert - // batch could be. - if d := len(alerts) - n.opts.QueueCapacity; d > 0 { - alerts = alerts[d:] - - n.logger.Warn("Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) - n.metrics.dropped.Add(float64(d)) - } - - // If the queue is full, remove the oldest alerts in favor - // of newer ones. - if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { - n.queue = n.queue[d:] - - n.logger.Warn("Alert notification queue full, dropping alerts", "num_dropped", d) - n.metrics.dropped.Add(float64(d)) - } - n.queue = append(n.queue, alerts...) - - // Notify sending goroutine that there are alerts to be processed. - n.setMore() -} - -// setMore signals that the alert queue has items. -func (n *Manager) setMore() { - // If we cannot send on the channel, it means the signal already exists - // and has not been consumed yet. - select { - case n.more <- struct{}{}: - default: + for _, ams := range n.alertmanagers { + dropped := ams.send(alerts...) + for am, count := range dropped { + n.logger.Warn("Notification queue is full, and some old notifications have been dropped", "alertmanager", am, "count", count) + n.metrics.dropped.WithLabelValues(am).Add(float64(count)) + } + for am, q := range ams.buffers { + n.metrics.queueLength.WithLabelValues(am).Set(float64(q.len())) + } } } @@ -384,151 +297,6 @@ func (n *Manager) DroppedAlertmanagers() []*url.URL { return res } -// sendAll sends the alerts to all configured Alertmanagers concurrently. -// It returns true if the alerts could be sent successfully to at least one Alertmanager. -func (n *Manager) sendAll(alerts ...*Alert) bool { - if len(alerts) == 0 { - return true - } - - begin := time.Now() - - // cachedPayload represent 'alerts' marshaled for Alertmanager API v2. - // Marshaling happens below. Reference here is for caching between - // for loop iterations. - var cachedPayload []byte - - n.mtx.RLock() - amSets := n.alertmanagers - n.mtx.RUnlock() - - var ( - wg sync.WaitGroup - amSetCovered sync.Map - ) - for k, ams := range amSets { - var ( - payload []byte - err error - amAlerts = alerts - ) - - ams.mtx.RLock() - - if len(ams.ams) == 0 { - ams.mtx.RUnlock() - continue - } - - if len(ams.cfg.AlertRelabelConfigs) > 0 { - amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) - if len(amAlerts) == 0 { - ams.mtx.RUnlock() - continue - } - // We can't use the cached values from previous iteration. - cachedPayload = nil - } - - switch ams.cfg.APIVersion { - case config.AlertmanagerAPIVersionV2: - { - if cachedPayload == nil { - openAPIAlerts := alertsToOpenAPIAlerts(amAlerts) - - cachedPayload, err = json.Marshal(openAPIAlerts) - if err != nil { - n.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err) - ams.mtx.RUnlock() - return false - } - } - - payload = cachedPayload - } - default: - { - n.logger.Error( - fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), - "err", err, - ) - ams.mtx.RUnlock() - return false - } - } - - if len(ams.cfg.AlertRelabelConfigs) > 0 { - // We can't use the cached values on the next iteration. - cachedPayload = nil - } - - // Being here means len(ams.ams) > 0 - amSetCovered.Store(k, false) - for _, am := range ams.ams { - wg.Add(1) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout)) - defer cancel() - - go func(ctx context.Context, k string, client *http.Client, url string, payload []byte, count int) { - err := n.sendOne(ctx, client, url, payload) - if err != nil { - n.logger.Error("Error sending alerts", "alertmanager", url, "count", count, "err", err) - n.metrics.errors.WithLabelValues(url).Add(float64(count)) - } else { - amSetCovered.CompareAndSwap(k, false, true) - } - - n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) - n.metrics.sent.WithLabelValues(url).Add(float64(count)) - - wg.Done() - }(ctx, k, ams.client, am.url().String(), payload, len(amAlerts)) - } - - ams.mtx.RUnlock() - } - - wg.Wait() - - // Return false if there are any sets which were attempted (e.g. not filtered - // out) but have no successes. - allAmSetsCovered := true - amSetCovered.Range(func(_, value any) bool { - if !value.(bool) { - allAmSetsCovered = false - return false - } - return true - }) - - return allAmSetsCovered -} - -func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error { - req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b)) - if err != nil { - return err - } - req.Header.Set("User-Agent", userAgent) - req.Header.Set("Content-Type", contentTypeJSON) - resp, err := n.opts.Do(ctx, c, req) - if err != nil { - return err - } - defer func() { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - }() - - // Any HTTP status 2xx is OK. - if resp.StatusCode/100 != 2 { - return fmt.Errorf("bad response status %s", resp.Status) - } - - return nil -} - // Stop signals the notification manager to shut down and immediately returns. // // Run will return once the notification manager has successfully shut down. diff --git a/notifier/manager_test.go b/notifier/manager_test.go index 139d90b690..d06bb15e28 100644 --- a/notifier/manager_test.go +++ b/notifier/manager_test.go @@ -14,11 +14,11 @@ package notifier import ( - "bytes" "context" "encoding/json" "fmt" "io" + "log/slog" "net/http" "net/http/httptest" "net/url" @@ -27,6 +27,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" @@ -44,21 +45,42 @@ import ( const maxBatchSize = 256 -func TestHandlerNextBatch(t *testing.T) { +func TestHandlerSendBatch(t *testing.T) { h := NewManager(&Options{}, nil) + b := newBuffer(10_000) + h.alertmanagers = map[string]*alertmanagerSet{ + "mock": { + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return "http://mock" }, + }, + }, + cfg: &config.DefaultAlertmanagerConfig, + buffers: map[string]*buffer{"http://mock": b}, + }, + } + + var alerts []*Alert for i := range make([]struct{}, 2*maxBatchSize+1) { - h.queue = append(h.queue, &Alert{ + alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", strconv.Itoa(i)), }) } + h.Send(alerts...) - expected := append([]*Alert{}, h.queue...) + expected := append([]*Alert{}, alerts...) - require.NoError(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch())) - require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch())) - require.NoError(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch())) - require.Empty(t, h.queue, "Expected queue to be empty but got %d alerts", len(h.queue)) + batch := make([]*Alert, maxBatchSize) + + b.pop(&batch) + require.NoError(t, alertsEqual(expected[0:maxBatchSize], batch)) + + b.pop(&batch) + require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], batch)) + + b.pop(&batch) + require.NoError(t, alertsEqual(expected[2*maxBatchSize:], batch)) } func alertsEqual(a, b []*Alert) error { @@ -108,11 +130,21 @@ func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string })) } +func getCounterValue(t *testing.T, metric *prometheus.CounterVec, labels ...string) float64 { + t.Helper() + m := &dto.Metric{} + if err := metric.WithLabelValues(labels...).Write(m); err != nil { + t.Fatal(err) + } + return m.Counter.GetValue() +} + func TestHandlerSendAll(t *testing.T) { var ( errc = make(chan error, 1) - expected = make([]*Alert, 0, maxBatchSize) + expected = make([]*Alert, 0) status1, status2, status3 atomic.Int32 + errors1, errors2, errors3 float64 ) status1.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK)) @@ -146,14 +178,21 @@ func TestHandlerSendAll(t *testing.T) { am3Cfg := config.DefaultAlertmanagerConfig am3Cfg.Timeout = model.Duration(time.Second) + opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: maxBatchSize} + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + h.alertmanagers["1"] = &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ urlf: func() string { return server1.URL }, }, }, - cfg: &am1Cfg, - client: authClient, + cfg: &am1Cfg, + client: authClient, + buffers: map[string]*buffer{server1.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, } h.alertmanagers["2"] = &alertmanagerSet{ @@ -166,15 +205,27 @@ func TestHandlerSendAll(t *testing.T) { }, }, cfg: &am2Cfg, + buffers: map[string]*buffer{ + server2.URL: newBuffer(opts.QueueCapacity), + server3.URL: newBuffer(opts.QueueCapacity), + }, + opts: opts, + metrics: h.metrics, + logger: logger, } h.alertmanagers["3"] = &alertmanagerSet{ - ams: []alertmanager{}, // empty set - cfg: &am3Cfg, + ams: []alertmanager{}, // empty set + cfg: &am3Cfg, + buffers: make(map[string]*buffer), + opts: opts, + metrics: h.metrics, + logger: logger, } + var alerts []*Alert for i := range make([]struct{}, maxBatchSize) { - h.queue = append(h.queue, &Alert{ + alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", strconv.Itoa(i)), }) expected = append(expected, &Alert{ @@ -191,37 +242,88 @@ func TestHandlerSendAll(t *testing.T) { } } + // start send loops + for _, ams := range h.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + // all ams in all sets are up - require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + // snapshot error metrics and check them + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // the only am in set 1 is down status1.Store(int32(http.StatusNotFound)) - require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.NotZero(t, errors1, "server1 has no send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // reset it status1.Store(int32(http.StatusOK)) + // reset metrics + h.metrics.errors.Reset() + // only one of the ams in set 2 is down status2.Store(int32(http.StatusInternalServerError)) - require.True(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.NotZero(t, errors2, "server2 has no send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // both ams in set 2 are down status3.Store(int32(http.StatusInternalServerError)) - require.False(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.NotZero(t, errors2, "server2 has no send errors") + require.NotZero(t, errors3, "server3 has no send errors") checkNoErr() + + // stop send routines by closing buffers + for _, ams := range h.alertmanagers { + for _, q := range ams.buffers { + q.close() + } + } } func TestHandlerSendAllRemapPerAm(t *testing.T) { var ( errc = make(chan error, 1) - expected1 = make([]*Alert, 0, maxBatchSize) - expected2 = make([]*Alert, 0, maxBatchSize) + expected1 = make([]*Alert, 0) + expected2 = make([]*Alert, 0) expected3 = make([]*Alert, 0) status1, status2, status3 atomic.Int32 + errors1, errors2, errors3 float64 ) status1.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK)) @@ -261,6 +363,9 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { }, } + opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: maxBatchSize} + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + h.alertmanagers = map[string]*alertmanagerSet{ // Drop no alerts. "1": { @@ -269,7 +374,11 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { urlf: func() string { return server1.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server1.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, }, // Drop only alerts with the "alertnamedrop" label. "2": { @@ -278,7 +387,11 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { urlf: func() string { return server2.URL }, }, }, - cfg: &am2Cfg, + cfg: &am2Cfg, + buffers: map[string]*buffer{server2.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, }, // Drop all alerts. "3": { @@ -287,17 +400,26 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { urlf: func() string { return server3.URL }, }, }, - cfg: &am3Cfg, + cfg: &am3Cfg, + buffers: map[string]*buffer{server3.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, }, // Empty list of Alertmanager endpoints. "4": { - ams: []alertmanager{}, - cfg: &config.DefaultAlertmanagerConfig, + ams: []alertmanager{}, + cfg: &config.DefaultAlertmanagerConfig, + buffers: make(map[string]*buffer), + opts: opts, + metrics: h.metrics, + logger: logger, }, } + var alerts []*Alert for i := range make([]struct{}, maxBatchSize/2) { - h.queue = append(h.queue, + alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", strconv.Itoa(i)), }, @@ -328,24 +450,66 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { } } + // start send loops + for _, ams := range h.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + // all ams are up - require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + // snapshot error metrics and check them + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // the only am in set 1 goes down status1.Store(int32(http.StatusInternalServerError)) - require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.NotZero(t, errors1, "server1 has no send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // reset set 1 status1.Store(int32(http.StatusOK)) + // reset metrics + h.metrics.errors.Reset() + // set 3 loses its only am, but all alerts were dropped // so there was nothing to send, keeping sendAll true status3.Store(int32(http.StatusInternalServerError)) - require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(3 * time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() + // stop send routines by closing buffers + for _, ams := range h.alertmanagers { + for _, q := range ams.buffers { + q.close() + } + } + // Verify that individual locks are released. for k := range h.alertmanagers { h.alertmanagers[k].mtx.Lock() @@ -354,33 +518,6 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { } } -func TestCustomDo(t *testing.T) { - const testURL = "http://testurl.com/" - const testBody = "testbody" - - var received bool - h := NewManager(&Options{ - Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { - received = true - body, err := io.ReadAll(req.Body) - - require.NoError(t, err) - - require.Equal(t, testBody, string(body)) - - require.Equal(t, testURL, req.URL.String()) - - return &http.Response{ - Body: io.NopCloser(bytes.NewBuffer(nil)), - }, nil - }, - }, nil) - - h.sendOne(context.Background(), nil, testURL, []byte(testBody)) - - require.True(t, received, "Expected to receive an alert, but didn't") -} - func TestExternalLabels(t *testing.T) { h := NewManager(&Options{ QueueCapacity: 3 * maxBatchSize, @@ -397,6 +534,16 @@ func TestExternalLabels(t *testing.T) { }, }, nil) + queue := newBuffer(h.opts.QueueCapacity) + h.alertmanagers = map[string]*alertmanagerSet{ + "test": { + buffers: map[string]*buffer{"test": queue}, + cfg: &config.AlertmanagerConfig{ + RelabelConfigs: h.opts.RelabelConfigs, + }, + }, + } + // This alert should get the external label attached. h.Send(&Alert{ Labels: labels.FromStrings("alertname", "test"), @@ -408,12 +555,15 @@ func TestExternalLabels(t *testing.T) { Labels: labels.FromStrings("alertname", "externalrelabelthis"), }) + alerts := make([]*Alert, maxBatchSize) + queue.pop(&alerts) + expected := []*Alert{ {Labels: labels.FromStrings("alertname", "test", "a", "b")}, {Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")}, } - require.NoError(t, alertsEqual(expected, h.queue)) + require.NoError(t, alertsEqual(expected, alerts)) } func TestHandlerRelabel(t *testing.T) { @@ -436,6 +586,16 @@ func TestHandlerRelabel(t *testing.T) { }, }, nil) + queue := newBuffer(h.opts.QueueCapacity) + h.alertmanagers = map[string]*alertmanagerSet{ + "test": { + buffers: map[string]*buffer{"test": queue}, + cfg: &config.AlertmanagerConfig{ + RelabelConfigs: h.opts.RelabelConfigs, + }, + }, + } + // This alert should be dropped due to the configuration h.Send(&Alert{ Labels: labels.FromStrings("alertname", "drop"), @@ -446,11 +606,14 @@ func TestHandlerRelabel(t *testing.T) { Labels: labels.FromStrings("alertname", "rename"), }) + alerts := make([]*Alert, maxBatchSize) + queue.pop(&alerts) + expected := []*Alert{ {Labels: labels.FromStrings("alertname", "renamed")}, } - require.NoError(t, alertsEqual(expected, h.queue)) + require.NoError(t, alertsEqual(expected, alerts)) } func TestHandlerQueuing(t *testing.T) { @@ -514,8 +677,19 @@ func TestHandlerQueuing(t *testing.T) { urlf: func() string { return server.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server.URL: newBuffer(h.opts.QueueCapacity)}, + metrics: h.metrics, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), } + + for _, ams := range h.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + go h.Run(nil) defer h.Stop() @@ -706,6 +880,7 @@ func makeInputTargetGroup() *targetgroup.Group { // TestHangingNotifier ensures that the notifier takes into account SD changes even when there are // queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. // and https://github.com/prometheus/prometheus/issues/8768. +// TODO: Drop this test as we have independent queues per alertmanager now. func TestHangingNotifier(t *testing.T) { const ( batches = 100 @@ -782,7 +957,20 @@ func TestHangingNotifier(t *testing.T) { }, cfg: &amCfg, metrics: notifier.metrics, + buffers: map[string]*buffer{ + faultyURL.String(): newBuffer(notifier.opts.QueueCapacity), + functionalURL.String(): newBuffer(notifier.opts.QueueCapacity), + }, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), } + + for _, ams := range notifier.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + go notifier.Run(sdManager.SyncCh()) defer notifier.Stop() @@ -842,10 +1030,13 @@ loop2: // The faulty alertmanager was dropped. if len(notifier.Alertmanagers()) == 1 { // Prevent from TOCTOU. - require.Positive(t, notifier.queueLen()) + for _, ams := range notifier.alertmanagers { + for _, q := range ams.buffers { + require.Zero(t, q.len()) + } + } break loop2 } - require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.") } } } @@ -897,7 +1088,17 @@ func TestStop_DrainingDisabled(t *testing.T) { urlf: func() string { return server.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server.URL: newBuffer(m.opts.QueueCapacity)}, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + metrics: newAlertMetrics(prometheus.DefaultRegisterer, nil), + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + + for _, ams := range m.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } } notificationManagerStopped := make(chan struct{}) @@ -933,7 +1134,8 @@ func TestStop_DrainingDisabled(t *testing.T) { require.FailNow(t, "gave up waiting for notification manager to stop") } - require.Equal(t, int64(1), alertsReceived.Load()) + // At least one alert must have been delivered before notification manager stops. + require.Positive(t, alertsReceived.Load()) } func TestStop_DrainingEnabled(t *testing.T) { @@ -942,9 +1144,6 @@ func TestStop_DrainingEnabled(t *testing.T) { alertsReceived := atomic.NewInt64(0) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Let the test know we've received a request. - receiverReceivedRequest <- struct{}{} - var alerts []*Alert b, err := io.ReadAll(r.Body) @@ -955,6 +1154,9 @@ func TestStop_DrainingEnabled(t *testing.T) { alertsReceived.Add(int64(len(alerts))) + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + // Wait for the test to release us. <-releaseReceiver @@ -983,7 +1185,17 @@ func TestStop_DrainingEnabled(t *testing.T) { urlf: func() string { return server.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server.URL: newBuffer(m.opts.QueueCapacity)}, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + metrics: m.metrics, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + + for _, ams := range m.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } } notificationManagerStopped := make(chan struct{}) @@ -1013,10 +1225,11 @@ func TestStop_DrainingEnabled(t *testing.T) { select { case <-notificationManagerStopped: // Nothing more to do. - case <-time.After(200 * time.Millisecond): + case <-time.After(400 * time.Millisecond): require.FailNow(t, "gave up waiting for notification manager to stop") } + <-receiverReceivedRequest require.Equal(t, int64(2), alertsReceived.Load()) } @@ -1149,7 +1362,10 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { urlf: func() string { return blackHoleAM.URL }, }, }, - cfg: &amCfg, + cfg: &amCfg, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + buffers: map[string]*buffer{blackHoleAM.URL: newBuffer(10)}, + metrics: h.metrics, } h.alertmanagers["2"] = &alertmanagerSet{ @@ -1158,16 +1374,23 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { urlf: func() string { return immediateAM.URL }, }, }, - cfg: &amCfg, + cfg: &amCfg, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + buffers: map[string]*buffer{immediateAM.URL: newBuffer(10)}, + metrics: h.metrics, } - h.queue = append(h.queue, &Alert{ - Labels: labels.FromStrings("alertname", "test"), - }) - doneSendAll := make(chan struct{}) go func() { - h.sendAll(h.queue...) + for _, s := range h.alertmanagers { + for _, am := range s.ams { + go s.sendLoop(am) + } + } + + h.Send(&Alert{ + Labels: labels.FromStrings("alertname", "test"), + }) close(doneSendAll) }() @@ -1187,7 +1410,7 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { } func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { // Do nothing, wait to be canceled. <-stop w.WriteHeader(http.StatusOK) @@ -1195,7 +1418,7 @@ func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server { } func newImmediateAlertManager(done chan<- struct{}) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) close(done) })) diff --git a/notifier/metric.go b/notifier/metric.go index 92f10cbe79..ff792d1322 100644 --- a/notifier/metric.go +++ b/notifier/metric.go @@ -19,13 +19,13 @@ type alertMetrics struct { latency *prometheus.SummaryVec errors *prometheus.CounterVec sent *prometheus.CounterVec - dropped prometheus.Counter - queueLength prometheus.GaugeFunc - queueCapacity prometheus.Gauge + dropped *prometheus.CounterVec + queueLength *prometheus.GaugeVec + queueCapacity *prometheus.GaugeVec alertmanagersDiscovered prometheus.GaugeFunc } -func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanagersDiscovered func() float64) *alertMetrics { +func newAlertMetrics(r prometheus.Registerer, alertmanagersDiscovered func() float64) *alertMetrics { m := &alertMetrics{ latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: namespace, @@ -52,32 +52,30 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag }, []string{alertmanagerLabel}, ), - dropped: prometheus.NewCounter(prometheus.CounterOpts{ + dropped: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "dropped_total", Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", - }), - queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + }, []string{alertmanagerLabel}), + queueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_length", Help: "The number of alert notifications in the queue.", - }, queueLen), - queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ + }, []string{alertmanagerLabel}), + queueCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_capacity", Help: "The capacity of the alert notifications queue.", - }), + }, []string{alertmanagerLabel}), alertmanagersDiscovered: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_notifications_alertmanagers_discovered", Help: "The number of alertmanagers discovered and active.", }, alertmanagersDiscovered), } - m.queueCapacity.Set(float64(queueCap)) - if r != nil { r.MustRegister( m.latency,