diff --git a/notifier/alert.go b/notifier/alert.go index 72f313c25e..bc03121ac2 100644 --- a/notifier/alert.go +++ b/notifier/alert.go @@ -68,11 +68,26 @@ func (a *Alert) ResolvedAt(ts time.Time) bool { return !a.EndsAt.After(ts) } +// Copy returns a copy of the alert. +func (a *Alert) Copy() *Alert { + return &Alert{ + Labels: a.Labels.Copy(), + Annotations: a.Annotations.Copy(), + StartsAt: a.StartsAt, + EndsAt: a.EndsAt, + GeneratorURL: a.GeneratorURL, + } +} + func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert { lb := labels.NewBuilder(labels.EmptyLabels()) var relabeledAlerts []*Alert - for _, a := range alerts { + for _, s := range alerts { + // Copy the alert to avoid race condition between multiple alertmanagersets + // holding references to the same alerts. + a := s.Copy() + lb.Reset(a.Labels) externalLabels.Range(func(l labels.Label) { if a.Labels.Get(l.Name) == "" { diff --git a/notifier/alertmanagerset.go b/notifier/alertmanagerset.go index bb269d4ed6..2425da91ec 100644 --- a/notifier/alertmanagerset.go +++ b/notifier/alertmanagerset.go @@ -14,11 +14,17 @@ package notifier import ( + "bytes" + "context" "crypto/md5" "encoding/hex" + "encoding/json" + "fmt" + "io" "log/slog" "net/http" "sync" + "time" config_util "github.com/prometheus/common/config" "github.com/prometheus/sigv4" @@ -26,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/labels" ) // alertmanagerSet contains a set of Alertmanagers discovered via a group of service @@ -33,16 +40,18 @@ import ( type alertmanagerSet struct { cfg *config.AlertmanagerConfig client *http.Client + opts *Options metrics *alertMetrics mtx sync.RWMutex ams []alertmanager droppedAms []alertmanager + buffers map[string]*buffer logger *slog.Logger } -func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { +func newAlertmanagerSet(cfg *config.AlertmanagerConfig, opts *Options, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") if err != nil { return nil, err @@ -61,6 +70,8 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, met s := &alertmanagerSet{ client: client, cfg: cfg, + opts: opts, + buffers: make(map[string]*buffer), logger: logger, metrics: metrics, } @@ -98,24 +109,31 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { continue } - // This will initialize the Counters for the AM to 0. - s.metrics.sent.WithLabelValues(us) + // This will initialize the Counters for the AM to 0 and set the static queue capacity gauge. + s.metrics.dropped.WithLabelValues(us) s.metrics.errors.WithLabelValues(us) + s.metrics.sent.WithLabelValues(us) + s.metrics.queueCapacity.WithLabelValues(us).Set(float64(s.opts.QueueCapacity)) seen[us] = struct{}{} s.ams = append(s.ams, am) } + s.startSendLoops(allAms) + // Now remove counters for any removed Alertmanagers. for _, am := range previousAms { us := am.url().String() if _, ok := seen[us]; ok { continue } - s.metrics.latency.DeleteLabelValues(us) - s.metrics.sent.DeleteLabelValues(us) + s.metrics.dropped.DeleteLabelValues(us) s.metrics.errors.DeleteLabelValues(us) + s.metrics.latency.DeleteLabelValues(us) + s.metrics.queueLength.DeleteLabelValues(us) + s.metrics.sent.DeleteLabelValues(us) seen[us] = struct{}{} } + s.cleanSendLoops(previousAms) } func (s *alertmanagerSet) configHash() (string, error) { @@ -126,3 +144,149 @@ func (s *alertmanagerSet) configHash() (string, error) { hash := md5.Sum(b) return hex.EncodeToString(hash[:]), nil } + +func (s *alertmanagerSet) send(alerts ...*Alert) map[string]int { + dropped := make(map[string]int) + + if len(s.cfg.AlertRelabelConfigs) > 0 { + alerts = relabelAlerts(s.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) + if len(alerts) == 0 { + return dropped + } + } + + for am, q := range s.buffers { + d := q.push(alerts...) + dropped[am] += d + } + + return dropped +} + +// startSendLoops create buffers for newly discovered alertmanager and +// starts a send loop for each. +// This function expects the caller to acquire needed locks. +func (s *alertmanagerSet) startSendLoops(all []alertmanager) { + for _, am := range all { + us := am.url().String() + // create new buffers and start send loops for new alertmanagers in the set. + if _, ok := s.buffers[us]; !ok { + s.buffers[us] = newBuffer(s.opts.QueueCapacity) + go s.sendLoop(am) + } + } +} + +// stopSendLoops stops the send loops for each removed alertmanager by +// closing and removing their respective buffers. +// This function expects the caller to acquire needed locks. +func (s *alertmanagerSet) cleanSendLoops(removed []alertmanager) { + for _, am := range removed { + us := am.url().String() + s.buffers[us].close() + delete(s.buffers, us) + } +} + +func (s *alertmanagerSet) sendLoop(am alertmanager) { + url := am.url().String() + + // allocate an alerts buffer for alerts with length and capacity equal to max batch size. + alerts := make([]*Alert, s.opts.MaxBatchSize) + for { + b := s.getBuffer(url) + if b == nil { + return + } + + _, ok := <-b.hasWork + if !ok { + return + } + + b.pop(&alerts) + + if !s.postNotifications(am, alerts) { + s.metrics.dropped.WithLabelValues(url).Add(float64(len(alerts))) + } + } +} + +func (s *alertmanagerSet) postNotifications(am alertmanager, alerts []*Alert) bool { + if len(alerts) == 0 { + return true + } + + begin := time.Now() + + var payload []byte + var err error + switch s.cfg.APIVersion { + case config.AlertmanagerAPIVersionV2: + { + openAPIAlerts := alertsToOpenAPIAlerts(alerts) + + payload, err = json.Marshal(openAPIAlerts) + if err != nil { + s.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err) + return false + } + } + + default: + { + s.logger.Error( + fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", s.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), + "err", err, + ) + return false + } + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.cfg.Timeout)) + defer cancel() + + url := am.url().String() + if err := s.sendOne(ctx, s.client, url, payload); err != nil { + s.logger.Error("Error sending alerts", "alertmanager", url, "count", len(alerts), "err", err) + s.metrics.errors.WithLabelValues(url).Add(float64(len(alerts))) + return false + } + s.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) + s.metrics.sent.WithLabelValues(url).Add(float64(len(alerts))) + + return true +} + +func (s *alertmanagerSet) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error { + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b)) + if err != nil { + return err + } + req.Header.Set("User-Agent", userAgent) + req.Header.Set("Content-Type", contentTypeJSON) + resp, err := s.opts.Do(ctx, c, req) + if err != nil { + return err + } + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %s", resp.Status) + } + + return nil +} + +func (s *alertmanagerSet) getBuffer(url string) *buffer { + s.mtx.RLock() + defer s.mtx.RUnlock() + if q, ok := s.buffers[url]; ok { + return q + } + return nil +} diff --git a/notifier/alertmanagerset_test.go b/notifier/alertmanagerset_test.go new file mode 100644 index 0000000000..3593e375b3 --- /dev/null +++ b/notifier/alertmanagerset_test.go @@ -0,0 +1,53 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "bytes" + "context" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCustomDo(t *testing.T) { + const testURL = "http://testurl.com/" + const testBody = "testbody" + + var received bool + h := alertmanagerSet{ + opts: &Options{ + Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + received = true + body, err := io.ReadAll(req.Body) + + require.NoError(t, err) + + require.Equal(t, testBody, string(body)) + + require.Equal(t, testURL, req.URL.String()) + + return &http.Response{ + Body: io.NopCloser(bytes.NewBuffer(nil)), + }, nil + }, + }, + } + + h.sendOne(context.Background(), nil, testURL, []byte(testBody)) + + require.True(t, received, "Expected to receive an alert, but didn't") +} diff --git a/notifier/buffer.go b/notifier/buffer.go new file mode 100644 index 0000000000..2d0aa64e6c --- /dev/null +++ b/notifier/buffer.go @@ -0,0 +1,124 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "sync" + + "go.uber.org/atomic" +) + +// buffer is a circular buffer for Alerts. +type buffer struct { + mtx sync.RWMutex + data []*Alert + size int + count int + readPointer int + writePointer int + hasWork chan struct{} + done chan struct{} + closed atomic.Bool +} + +func newBuffer(size int) *buffer { + return &buffer{ + data: make([]*Alert, size), + size: size, + hasWork: make(chan struct{}, 1), + done: make(chan struct{}, 1), + } +} + +func (b *buffer) push(alerts ...*Alert) (dropped int) { + b.mtx.Lock() + defer b.mtx.Unlock() + + for _, a := range alerts { + if b.count == b.size { + b.readPointer = (b.readPointer + 1) % b.size + dropped++ + } else { + b.count++ + } + b.data[b.writePointer] = a + b.writePointer = (b.writePointer + 1) % b.size + } + + // If the buffer still has items left, kick off the next iteration. + if b.count > 0 { + b.notifyWork() + } + + return +} + +// pop will move alerts from the buffer into the passed slice. +// Number of moved alerts = min (alerts in buffer and passed slice length). +// The silce length will be dynamically adjusted. +func (b *buffer) pop(alerts *[]*Alert) { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.count == 0 { + // Empty alerts from any cached data. + *alerts = (*alerts)[:0] + return + } + + count := min(b.count, cap(*alerts)) + *alerts = (*alerts)[0:count] + + for i := range count { + (*alerts)[i] = b.data[b.readPointer] + b.data[b.readPointer] = nil + b.readPointer = (b.readPointer + 1) % b.size + b.count-- + } + + // If the buffer still has items left, kick off the next iteration. + if b.count > 0 { + b.notifyWork() + } +} + +func (b *buffer) len() int { + b.mtx.RLock() + defer b.mtx.RUnlock() + return b.count +} + +func (b *buffer) notifyWork() { + if b.isClosed() { + return + } + + // Attempt to send a signal on the 'hasWork' channel if no signal is pending. + select { + case b.hasWork <- struct{}{}: + case <-b.done: + close(b.hasWork) + default: + // No action needed if the channel already has a pending signal. + } +} + +func (b *buffer) close() { + b.done <- struct{}{} + b.closed.Store(true) +} + +func (b *buffer) isClosed() bool { + return b.closed.Load() +} diff --git a/notifier/buffer_test.go b/notifier/buffer_test.go new file mode 100644 index 0000000000..5874b8c3dc --- /dev/null +++ b/notifier/buffer_test.go @@ -0,0 +1,130 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +func TestPushAlertsToBuffer(t *testing.T) { + alert1 := &Alert{Labels: labels.FromStrings("alertname", "existing1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "existing2")} + + // Initialize a buffer with capacity 5 and 2 existing alerts + b := newBuffer(5) + b.push(alert1, alert2) + require.Equal(t, []*Alert{alert1, alert2, nil, nil, nil}, b.data) + require.Equal(t, 2, b.len(), "Expected buffer length of 2") + + alert3 := &Alert{Labels: labels.FromStrings("alertname", "new1")} + alert4 := &Alert{Labels: labels.FromStrings("alertname", "new2")} + + // Push new alerts to buffer, expect 0 dropped + require.Zero(t, b.push(alert3, alert4), "Expected 0 dropped alerts") + + // Verify all new alerts were added to the buffer + require.Equal(t, []*Alert{alert1, alert2, alert3, alert4, nil}, b.data) + require.Equal(t, 4, b.len(), "Expected buffer length of 4") +} + +// Pushing alerts exceeding buffer capacity should drop oldest alerts. +func TestPushAlertsToBufferExceedingCapacity(t *testing.T) { + alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")} + + // Initialize a buffer with capacity 3 + b := newBuffer(3) + b.push(alert1, alert2) + + alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")} + alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")} + + // Push new alerts to buffer, expect 1 dropped + require.Equal(t, 1, b.push(alert3, alert4), "Expected 1 dropped alerts") + + // Verify all new alerts were added to the buffer, alert4 will be at the beginning of buffer, overwritten alert1 + require.Equal(t, []*Alert{alert4, alert2, alert3}, b.data, "Expected 3 alerts in the buffer") +} + +// Pushing alerts exceeding total buffer capacity should drop alerts from both old and new. +func TestPushAlertsToBufferExceedingTotalCapacity(t *testing.T) { + alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")} + + // Initialize a buffer with capacity 3 + b := newBuffer(3) + b.push(alert1, alert2) + + alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")} + alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")} + alert5 := &Alert{Labels: labels.FromStrings("alertname", "alert5")} + alert6 := &Alert{Labels: labels.FromStrings("alertname", "alert6")} + + // Push new alerts to buffer, expect 3 dropped: 1 from new batch + 2 from existing bufferd items + require.Equal(t, 3, b.push(alert3, alert4, alert5, alert6), "Expected 3 dropped alerts") + + // Verify all new alerts were added to the buffer + require.Equal(t, []*Alert{alert4, alert5, alert6}, b.data, "Expected 3 alerts in the buffer") +} + +func TestPopAlertsFromBuffer(t *testing.T) { + // Initialize a buffer with capacity 5 + b := newBuffer(5) + + alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")} + alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")} + alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")} + b.push(alert1, alert2, alert3) + + // Test 3 alerts in the buffer + result1 := make([]*Alert, 3) + b.pop(&result1) + require.Equal(t, []*Alert{alert1, alert2, alert3}, result1, "Expected all 3 alerts") + require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements") + require.Zero(t, b.len(), "Expected buffer length of 0") + b.pop(&result1) + require.Empty(t, result1, "Expected pop to return empty slice") + + // Test full buffer + alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")} + alert5 := &Alert{Labels: labels.FromStrings("alertname", "alert5")} + b.push(alert1, alert2, alert3, alert4, alert5) + result2 := make([]*Alert, 5) + b.pop(&result2) + require.Equal(t, []*Alert{alert1, alert2, alert3, alert4, alert5}, result2, "Expected all 5 alerts") + require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements") + require.Zero(t, b.len(), "Expected buffer length of 0") + b.pop(&result2) + require.Empty(t, result2, "Expected pop to return empty slice") + + // Test smaller max size than capacity + b.push(alert1, alert2, alert3, alert4, alert5) + result3 := make([]*Alert, 3) + b.pop(&result3) + require.Equal(t, []*Alert{alert1, alert2, alert3}, result3, "Expected 3 first alerts from buffer") + require.Equal(t, 2, b.len(), "Expected buffer length of 2") + + // Pop the remaining 2 alerts in buffer + result4 := make([]*Alert, 3) + b.pop(&result4) + require.Equal(t, []*Alert{alert4, alert5}, result4, "Expected 2 last alerts from buffer") + require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements") + require.Zero(t, b.len(), "Expected buffer length of 0") + b.pop(&result4) + require.Empty(t, result4, "Expected pop to return empty slice") +} diff --git a/notifier/manager.go b/notifier/manager.go index a79757a066..3230ab3f58 100644 --- a/notifier/manager.go +++ b/notifier/manager.go @@ -14,11 +14,8 @@ package notifier import ( - "bytes" "context" - "encoding/json" "fmt" - "io" "log/slog" "net/http" "net/url" @@ -54,13 +51,11 @@ var userAgent = version.PrometheusUserAgent() // Manager is responsible for dispatching alert notifications to an // alert manager service. type Manager struct { - queue []*Alert - opts *Options + opts *Options metrics *alertMetrics - more chan struct{} - mtx sync.RWMutex + mtx sync.RWMutex stopOnce *sync.Once stopRequested chan struct{} @@ -105,23 +100,15 @@ func NewManager(o *Options, logger *slog.Logger) *Manager { } n := &Manager{ - queue: make([]*Alert, 0, o.QueueCapacity), - more: make(chan struct{}, 1), stopRequested: make(chan struct{}), stopOnce: &sync.Once{}, opts: o, logger: logger, } - queueLenFunc := func() float64 { return float64(n.queueLen()) } alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) } - n.metrics = newAlertMetrics( - o.Registerer, - o.QueueCapacity, - queueLenFunc, - alertmanagersDiscoveredFunc, - ) + n.metrics = newAlertMetrics(o.Registerer, alertmanagersDiscoveredFunc) return n } @@ -147,7 +134,7 @@ func (n *Manager) ApplyConfig(conf *config.Config) error { } for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() { - ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics) + ams, err := newAlertmanagerSet(cfg, n.opts, n.logger, n.metrics) if err != nil { return err } @@ -170,77 +157,15 @@ func (n *Manager) ApplyConfig(conf *config.Config) error { return nil } -func (n *Manager) queueLen() int { - n.mtx.RLock() - defer n.mtx.RUnlock() - - return len(n.queue) -} - -func (n *Manager) nextBatch() []*Alert { - n.mtx.Lock() - defer n.mtx.Unlock() - - var alerts []*Alert - - if maxBatchSize := n.opts.MaxBatchSize; len(n.queue) > maxBatchSize { - alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...) - n.queue = n.queue[maxBatchSize:] - } else { - alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...) - n.queue = n.queue[:0] - } - - return alerts -} - // Run dispatches notifications continuously, returning once Stop has been called and all // pending notifications have been drained from the queue (if draining is enabled). // // Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other. // Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - n.targetUpdateLoop(tsets) - }() - - go func() { - defer wg.Done() - n.sendLoop() - n.drainQueue() - }() - - wg.Wait() - n.logger.Info("Notification manager stopped") -} - -// sendLoop continuously consumes the notifications queue and sends alerts to -// the configured Alertmanagers. -func (n *Manager) sendLoop() { - for { - // If we've been asked to stop, that takes priority over sending any further notifications. - select { - case <-n.stopRequested: - return - default: - select { - case <-n.stopRequested: - return - - case <-n.more: - n.sendOneBatch() - - // If the queue still has items left, kick off the next iteration. - if n.queueLen() > 0 { - n.setMore() - } - } - } - } + n.targetUpdateLoop(tsets) + <-n.stopRequested + n.drainQueue() } // targetUpdateLoop receives updates of target groups and triggers a reload. @@ -261,31 +186,40 @@ func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) } } -func (n *Manager) sendOneBatch() { - alerts := n.nextBatch() - - if !n.sendAll(alerts...) { - n.metrics.dropped.Add(float64(len(alerts))) - } -} - func (n *Manager) drainQueue() { if !n.opts.DrainOnShutdown { - if n.queueLen() > 0 { - n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) - n.metrics.dropped.Add(float64(n.queueLen())) + for _, ams := range n.alertmanagers { + for am, b := range ams.buffers { + n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "alertmanager", am, "count", b.len()) + n.metrics.dropped.WithLabelValues(am).Add(float64(b.len())) + b.close() + } } - return } n.logger.Info("Draining any remaining notifications...") - for n.queueLen() > 0 { - n.sendOneBatch() + drained := false + for !drained { + remain := false + for _, ams := range n.alertmanagers { + for am, b := range ams.buffers { + if b.len() > 0 { + remain = true + n.logger.Info("Remaining notifications to drain", "alertmanager", am, "count", b.len()) + } + } + } + drained = !remain + time.Sleep(100 * time.Millisecond) + } + n.logger.Info("Remaining notifications drained, stopping send loops") + for _, ams := range n.alertmanagers { + for _, b := range ams.buffers { + b.close() + } } - - n.logger.Info("Remaining notifications drained") } func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { @@ -305,44 +239,23 @@ func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { // Send queues the given notification requests for processing. // Panics if called on a handler that is not running. func (n *Manager) Send(alerts ...*Alert) { - n.mtx.Lock() - defer n.mtx.Unlock() + n.mtx.RLock() + defer n.mtx.RUnlock() alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts) if len(alerts) == 0 { return } - // Queue capacity should be significantly larger than a single alert - // batch could be. - if d := len(alerts) - n.opts.QueueCapacity; d > 0 { - alerts = alerts[d:] - - n.logger.Warn("Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) - n.metrics.dropped.Add(float64(d)) - } - - // If the queue is full, remove the oldest alerts in favor - // of newer ones. - if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { - n.queue = n.queue[d:] - - n.logger.Warn("Alert notification queue full, dropping alerts", "num_dropped", d) - n.metrics.dropped.Add(float64(d)) - } - n.queue = append(n.queue, alerts...) - - // Notify sending goroutine that there are alerts to be processed. - n.setMore() -} - -// setMore signals that the alert queue has items. -func (n *Manager) setMore() { - // If we cannot send on the channel, it means the signal already exists - // and has not been consumed yet. - select { - case n.more <- struct{}{}: - default: + for _, ams := range n.alertmanagers { + dropped := ams.send(alerts...) + for am, count := range dropped { + n.logger.Warn("Notification queue is full, and some old notifications have been dropped", "alertmanager", am, "count", count) + n.metrics.dropped.WithLabelValues(am).Add(float64(count)) + } + for am, q := range ams.buffers { + n.metrics.queueLength.WithLabelValues(am).Set(float64(q.len())) + } } } @@ -384,151 +297,6 @@ func (n *Manager) DroppedAlertmanagers() []*url.URL { return res } -// sendAll sends the alerts to all configured Alertmanagers concurrently. -// It returns true if the alerts could be sent successfully to at least one Alertmanager. -func (n *Manager) sendAll(alerts ...*Alert) bool { - if len(alerts) == 0 { - return true - } - - begin := time.Now() - - // cachedPayload represent 'alerts' marshaled for Alertmanager API v2. - // Marshaling happens below. Reference here is for caching between - // for loop iterations. - var cachedPayload []byte - - n.mtx.RLock() - amSets := n.alertmanagers - n.mtx.RUnlock() - - var ( - wg sync.WaitGroup - amSetCovered sync.Map - ) - for k, ams := range amSets { - var ( - payload []byte - err error - amAlerts = alerts - ) - - ams.mtx.RLock() - - if len(ams.ams) == 0 { - ams.mtx.RUnlock() - continue - } - - if len(ams.cfg.AlertRelabelConfigs) > 0 { - amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) - if len(amAlerts) == 0 { - ams.mtx.RUnlock() - continue - } - // We can't use the cached values from previous iteration. - cachedPayload = nil - } - - switch ams.cfg.APIVersion { - case config.AlertmanagerAPIVersionV2: - { - if cachedPayload == nil { - openAPIAlerts := alertsToOpenAPIAlerts(amAlerts) - - cachedPayload, err = json.Marshal(openAPIAlerts) - if err != nil { - n.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err) - ams.mtx.RUnlock() - return false - } - } - - payload = cachedPayload - } - default: - { - n.logger.Error( - fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), - "err", err, - ) - ams.mtx.RUnlock() - return false - } - } - - if len(ams.cfg.AlertRelabelConfigs) > 0 { - // We can't use the cached values on the next iteration. - cachedPayload = nil - } - - // Being here means len(ams.ams) > 0 - amSetCovered.Store(k, false) - for _, am := range ams.ams { - wg.Add(1) - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout)) - defer cancel() - - go func(ctx context.Context, k string, client *http.Client, url string, payload []byte, count int) { - err := n.sendOne(ctx, client, url, payload) - if err != nil { - n.logger.Error("Error sending alerts", "alertmanager", url, "count", count, "err", err) - n.metrics.errors.WithLabelValues(url).Add(float64(count)) - } else { - amSetCovered.CompareAndSwap(k, false, true) - } - - n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) - n.metrics.sent.WithLabelValues(url).Add(float64(count)) - - wg.Done() - }(ctx, k, ams.client, am.url().String(), payload, len(amAlerts)) - } - - ams.mtx.RUnlock() - } - - wg.Wait() - - // Return false if there are any sets which were attempted (e.g. not filtered - // out) but have no successes. - allAmSetsCovered := true - amSetCovered.Range(func(_, value any) bool { - if !value.(bool) { - allAmSetsCovered = false - return false - } - return true - }) - - return allAmSetsCovered -} - -func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error { - req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b)) - if err != nil { - return err - } - req.Header.Set("User-Agent", userAgent) - req.Header.Set("Content-Type", contentTypeJSON) - resp, err := n.opts.Do(ctx, c, req) - if err != nil { - return err - } - defer func() { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - }() - - // Any HTTP status 2xx is OK. - if resp.StatusCode/100 != 2 { - return fmt.Errorf("bad response status %s", resp.Status) - } - - return nil -} - // Stop signals the notification manager to shut down and immediately returns. // // Run will return once the notification manager has successfully shut down. diff --git a/notifier/manager_test.go b/notifier/manager_test.go index 139d90b690..d06bb15e28 100644 --- a/notifier/manager_test.go +++ b/notifier/manager_test.go @@ -14,11 +14,11 @@ package notifier import ( - "bytes" "context" "encoding/json" "fmt" "io" + "log/slog" "net/http" "net/http/httptest" "net/url" @@ -27,6 +27,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" @@ -44,21 +45,42 @@ import ( const maxBatchSize = 256 -func TestHandlerNextBatch(t *testing.T) { +func TestHandlerSendBatch(t *testing.T) { h := NewManager(&Options{}, nil) + b := newBuffer(10_000) + h.alertmanagers = map[string]*alertmanagerSet{ + "mock": { + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return "http://mock" }, + }, + }, + cfg: &config.DefaultAlertmanagerConfig, + buffers: map[string]*buffer{"http://mock": b}, + }, + } + + var alerts []*Alert for i := range make([]struct{}, 2*maxBatchSize+1) { - h.queue = append(h.queue, &Alert{ + alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", strconv.Itoa(i)), }) } + h.Send(alerts...) - expected := append([]*Alert{}, h.queue...) + expected := append([]*Alert{}, alerts...) - require.NoError(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch())) - require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch())) - require.NoError(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch())) - require.Empty(t, h.queue, "Expected queue to be empty but got %d alerts", len(h.queue)) + batch := make([]*Alert, maxBatchSize) + + b.pop(&batch) + require.NoError(t, alertsEqual(expected[0:maxBatchSize], batch)) + + b.pop(&batch) + require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], batch)) + + b.pop(&batch) + require.NoError(t, alertsEqual(expected[2*maxBatchSize:], batch)) } func alertsEqual(a, b []*Alert) error { @@ -108,11 +130,21 @@ func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string })) } +func getCounterValue(t *testing.T, metric *prometheus.CounterVec, labels ...string) float64 { + t.Helper() + m := &dto.Metric{} + if err := metric.WithLabelValues(labels...).Write(m); err != nil { + t.Fatal(err) + } + return m.Counter.GetValue() +} + func TestHandlerSendAll(t *testing.T) { var ( errc = make(chan error, 1) - expected = make([]*Alert, 0, maxBatchSize) + expected = make([]*Alert, 0) status1, status2, status3 atomic.Int32 + errors1, errors2, errors3 float64 ) status1.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK)) @@ -146,14 +178,21 @@ func TestHandlerSendAll(t *testing.T) { am3Cfg := config.DefaultAlertmanagerConfig am3Cfg.Timeout = model.Duration(time.Second) + opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: maxBatchSize} + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + h.alertmanagers["1"] = &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ urlf: func() string { return server1.URL }, }, }, - cfg: &am1Cfg, - client: authClient, + cfg: &am1Cfg, + client: authClient, + buffers: map[string]*buffer{server1.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, } h.alertmanagers["2"] = &alertmanagerSet{ @@ -166,15 +205,27 @@ func TestHandlerSendAll(t *testing.T) { }, }, cfg: &am2Cfg, + buffers: map[string]*buffer{ + server2.URL: newBuffer(opts.QueueCapacity), + server3.URL: newBuffer(opts.QueueCapacity), + }, + opts: opts, + metrics: h.metrics, + logger: logger, } h.alertmanagers["3"] = &alertmanagerSet{ - ams: []alertmanager{}, // empty set - cfg: &am3Cfg, + ams: []alertmanager{}, // empty set + cfg: &am3Cfg, + buffers: make(map[string]*buffer), + opts: opts, + metrics: h.metrics, + logger: logger, } + var alerts []*Alert for i := range make([]struct{}, maxBatchSize) { - h.queue = append(h.queue, &Alert{ + alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", strconv.Itoa(i)), }) expected = append(expected, &Alert{ @@ -191,37 +242,88 @@ func TestHandlerSendAll(t *testing.T) { } } + // start send loops + for _, ams := range h.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + // all ams in all sets are up - require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + // snapshot error metrics and check them + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // the only am in set 1 is down status1.Store(int32(http.StatusNotFound)) - require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.NotZero(t, errors1, "server1 has no send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // reset it status1.Store(int32(http.StatusOK)) + // reset metrics + h.metrics.errors.Reset() + // only one of the ams in set 2 is down status2.Store(int32(http.StatusInternalServerError)) - require.True(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.NotZero(t, errors2, "server2 has no send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // both ams in set 2 are down status3.Store(int32(http.StatusInternalServerError)) - require.False(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.NotZero(t, errors2, "server2 has no send errors") + require.NotZero(t, errors3, "server3 has no send errors") checkNoErr() + + // stop send routines by closing buffers + for _, ams := range h.alertmanagers { + for _, q := range ams.buffers { + q.close() + } + } } func TestHandlerSendAllRemapPerAm(t *testing.T) { var ( errc = make(chan error, 1) - expected1 = make([]*Alert, 0, maxBatchSize) - expected2 = make([]*Alert, 0, maxBatchSize) + expected1 = make([]*Alert, 0) + expected2 = make([]*Alert, 0) expected3 = make([]*Alert, 0) status1, status2, status3 atomic.Int32 + errors1, errors2, errors3 float64 ) status1.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK)) @@ -261,6 +363,9 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { }, } + opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: maxBatchSize} + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + h.alertmanagers = map[string]*alertmanagerSet{ // Drop no alerts. "1": { @@ -269,7 +374,11 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { urlf: func() string { return server1.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server1.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, }, // Drop only alerts with the "alertnamedrop" label. "2": { @@ -278,7 +387,11 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { urlf: func() string { return server2.URL }, }, }, - cfg: &am2Cfg, + cfg: &am2Cfg, + buffers: map[string]*buffer{server2.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, }, // Drop all alerts. "3": { @@ -287,17 +400,26 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { urlf: func() string { return server3.URL }, }, }, - cfg: &am3Cfg, + cfg: &am3Cfg, + buffers: map[string]*buffer{server3.URL: newBuffer(opts.QueueCapacity)}, + opts: opts, + metrics: h.metrics, + logger: logger, }, // Empty list of Alertmanager endpoints. "4": { - ams: []alertmanager{}, - cfg: &config.DefaultAlertmanagerConfig, + ams: []alertmanager{}, + cfg: &config.DefaultAlertmanagerConfig, + buffers: make(map[string]*buffer), + opts: opts, + metrics: h.metrics, + logger: logger, }, } + var alerts []*Alert for i := range make([]struct{}, maxBatchSize/2) { - h.queue = append(h.queue, + alerts = append(alerts, &Alert{ Labels: labels.FromStrings("alertname", strconv.Itoa(i)), }, @@ -328,24 +450,66 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { } } + // start send loops + for _, ams := range h.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + // all ams are up - require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + // snapshot error metrics and check them + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // the only am in set 1 goes down status1.Store(int32(http.StatusInternalServerError)) - require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.NotZero(t, errors1, "server1 has no send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() // reset set 1 status1.Store(int32(http.StatusOK)) + // reset metrics + h.metrics.errors.Reset() + // set 3 loses its only am, but all alerts were dropped // so there was nothing to send, keeping sendAll true status3.Store(int32(http.StatusInternalServerError)) - require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + h.Send(alerts...) + time.Sleep(3 * time.Second) + + errors1 = getCounterValue(t, h.metrics.errors, server1.URL) + errors2 = getCounterValue(t, h.metrics.errors, server2.URL) + errors3 = getCounterValue(t, h.metrics.errors, server3.URL) + require.Zero(t, errors1, "server1 has unexpected send errors") + require.Zero(t, errors2, "server2 has unexpected send errors") + require.Zero(t, errors3, "server3 has unexpected send errors") checkNoErr() + // stop send routines by closing buffers + for _, ams := range h.alertmanagers { + for _, q := range ams.buffers { + q.close() + } + } + // Verify that individual locks are released. for k := range h.alertmanagers { h.alertmanagers[k].mtx.Lock() @@ -354,33 +518,6 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) { } } -func TestCustomDo(t *testing.T) { - const testURL = "http://testurl.com/" - const testBody = "testbody" - - var received bool - h := NewManager(&Options{ - Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { - received = true - body, err := io.ReadAll(req.Body) - - require.NoError(t, err) - - require.Equal(t, testBody, string(body)) - - require.Equal(t, testURL, req.URL.String()) - - return &http.Response{ - Body: io.NopCloser(bytes.NewBuffer(nil)), - }, nil - }, - }, nil) - - h.sendOne(context.Background(), nil, testURL, []byte(testBody)) - - require.True(t, received, "Expected to receive an alert, but didn't") -} - func TestExternalLabels(t *testing.T) { h := NewManager(&Options{ QueueCapacity: 3 * maxBatchSize, @@ -397,6 +534,16 @@ func TestExternalLabels(t *testing.T) { }, }, nil) + queue := newBuffer(h.opts.QueueCapacity) + h.alertmanagers = map[string]*alertmanagerSet{ + "test": { + buffers: map[string]*buffer{"test": queue}, + cfg: &config.AlertmanagerConfig{ + RelabelConfigs: h.opts.RelabelConfigs, + }, + }, + } + // This alert should get the external label attached. h.Send(&Alert{ Labels: labels.FromStrings("alertname", "test"), @@ -408,12 +555,15 @@ func TestExternalLabels(t *testing.T) { Labels: labels.FromStrings("alertname", "externalrelabelthis"), }) + alerts := make([]*Alert, maxBatchSize) + queue.pop(&alerts) + expected := []*Alert{ {Labels: labels.FromStrings("alertname", "test", "a", "b")}, {Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")}, } - require.NoError(t, alertsEqual(expected, h.queue)) + require.NoError(t, alertsEqual(expected, alerts)) } func TestHandlerRelabel(t *testing.T) { @@ -436,6 +586,16 @@ func TestHandlerRelabel(t *testing.T) { }, }, nil) + queue := newBuffer(h.opts.QueueCapacity) + h.alertmanagers = map[string]*alertmanagerSet{ + "test": { + buffers: map[string]*buffer{"test": queue}, + cfg: &config.AlertmanagerConfig{ + RelabelConfigs: h.opts.RelabelConfigs, + }, + }, + } + // This alert should be dropped due to the configuration h.Send(&Alert{ Labels: labels.FromStrings("alertname", "drop"), @@ -446,11 +606,14 @@ func TestHandlerRelabel(t *testing.T) { Labels: labels.FromStrings("alertname", "rename"), }) + alerts := make([]*Alert, maxBatchSize) + queue.pop(&alerts) + expected := []*Alert{ {Labels: labels.FromStrings("alertname", "renamed")}, } - require.NoError(t, alertsEqual(expected, h.queue)) + require.NoError(t, alertsEqual(expected, alerts)) } func TestHandlerQueuing(t *testing.T) { @@ -514,8 +677,19 @@ func TestHandlerQueuing(t *testing.T) { urlf: func() string { return server.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server.URL: newBuffer(h.opts.QueueCapacity)}, + metrics: h.metrics, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), } + + for _, ams := range h.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + go h.Run(nil) defer h.Stop() @@ -706,6 +880,7 @@ func makeInputTargetGroup() *targetgroup.Group { // TestHangingNotifier ensures that the notifier takes into account SD changes even when there are // queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. // and https://github.com/prometheus/prometheus/issues/8768. +// TODO: Drop this test as we have independent queues per alertmanager now. func TestHangingNotifier(t *testing.T) { const ( batches = 100 @@ -782,7 +957,20 @@ func TestHangingNotifier(t *testing.T) { }, cfg: &amCfg, metrics: notifier.metrics, + buffers: map[string]*buffer{ + faultyURL.String(): newBuffer(notifier.opts.QueueCapacity), + functionalURL.String(): newBuffer(notifier.opts.QueueCapacity), + }, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), } + + for _, ams := range notifier.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } + } + go notifier.Run(sdManager.SyncCh()) defer notifier.Stop() @@ -842,10 +1030,13 @@ loop2: // The faulty alertmanager was dropped. if len(notifier.Alertmanagers()) == 1 { // Prevent from TOCTOU. - require.Positive(t, notifier.queueLen()) + for _, ams := range notifier.alertmanagers { + for _, q := range ams.buffers { + require.Zero(t, q.len()) + } + } break loop2 } - require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.") } } } @@ -897,7 +1088,17 @@ func TestStop_DrainingDisabled(t *testing.T) { urlf: func() string { return server.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server.URL: newBuffer(m.opts.QueueCapacity)}, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + metrics: newAlertMetrics(prometheus.DefaultRegisterer, nil), + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + + for _, ams := range m.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } } notificationManagerStopped := make(chan struct{}) @@ -933,7 +1134,8 @@ func TestStop_DrainingDisabled(t *testing.T) { require.FailNow(t, "gave up waiting for notification manager to stop") } - require.Equal(t, int64(1), alertsReceived.Load()) + // At least one alert must have been delivered before notification manager stops. + require.Positive(t, alertsReceived.Load()) } func TestStop_DrainingEnabled(t *testing.T) { @@ -942,9 +1144,6 @@ func TestStop_DrainingEnabled(t *testing.T) { alertsReceived := atomic.NewInt64(0) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Let the test know we've received a request. - receiverReceivedRequest <- struct{}{} - var alerts []*Alert b, err := io.ReadAll(r.Body) @@ -955,6 +1154,9 @@ func TestStop_DrainingEnabled(t *testing.T) { alertsReceived.Add(int64(len(alerts))) + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + // Wait for the test to release us. <-releaseReceiver @@ -983,7 +1185,17 @@ func TestStop_DrainingEnabled(t *testing.T) { urlf: func() string { return server.URL }, }, }, - cfg: &am1Cfg, + cfg: &am1Cfg, + buffers: map[string]*buffer{server.URL: newBuffer(m.opts.QueueCapacity)}, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + metrics: m.metrics, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + + for _, ams := range m.alertmanagers { + for _, am := range ams.ams { + go ams.sendLoop(am) + } } notificationManagerStopped := make(chan struct{}) @@ -1013,10 +1225,11 @@ func TestStop_DrainingEnabled(t *testing.T) { select { case <-notificationManagerStopped: // Nothing more to do. - case <-time.After(200 * time.Millisecond): + case <-time.After(400 * time.Millisecond): require.FailNow(t, "gave up waiting for notification manager to stop") } + <-receiverReceivedRequest require.Equal(t, int64(2), alertsReceived.Load()) } @@ -1149,7 +1362,10 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { urlf: func() string { return blackHoleAM.URL }, }, }, - cfg: &amCfg, + cfg: &amCfg, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + buffers: map[string]*buffer{blackHoleAM.URL: newBuffer(10)}, + metrics: h.metrics, } h.alertmanagers["2"] = &alertmanagerSet{ @@ -1158,16 +1374,23 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { urlf: func() string { return immediateAM.URL }, }, }, - cfg: &amCfg, + cfg: &amCfg, + opts: &Options{Do: do, MaxBatchSize: maxBatchSize}, + buffers: map[string]*buffer{immediateAM.URL: newBuffer(10)}, + metrics: h.metrics, } - h.queue = append(h.queue, &Alert{ - Labels: labels.FromStrings("alertname", "test"), - }) - doneSendAll := make(chan struct{}) go func() { - h.sendAll(h.queue...) + for _, s := range h.alertmanagers { + for _, am := range s.ams { + go s.sendLoop(am) + } + } + + h.Send(&Alert{ + Labels: labels.FromStrings("alertname", "test"), + }) close(doneSendAll) }() @@ -1187,7 +1410,7 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) { } func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { // Do nothing, wait to be canceled. <-stop w.WriteHeader(http.StatusOK) @@ -1195,7 +1418,7 @@ func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server { } func newImmediateAlertManager(done chan<- struct{}) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) close(done) })) diff --git a/notifier/metric.go b/notifier/metric.go index 92f10cbe79..ff792d1322 100644 --- a/notifier/metric.go +++ b/notifier/metric.go @@ -19,13 +19,13 @@ type alertMetrics struct { latency *prometheus.SummaryVec errors *prometheus.CounterVec sent *prometheus.CounterVec - dropped prometheus.Counter - queueLength prometheus.GaugeFunc - queueCapacity prometheus.Gauge + dropped *prometheus.CounterVec + queueLength *prometheus.GaugeVec + queueCapacity *prometheus.GaugeVec alertmanagersDiscovered prometheus.GaugeFunc } -func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanagersDiscovered func() float64) *alertMetrics { +func newAlertMetrics(r prometheus.Registerer, alertmanagersDiscovered func() float64) *alertMetrics { m := &alertMetrics{ latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: namespace, @@ -52,32 +52,30 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag }, []string{alertmanagerLabel}, ), - dropped: prometheus.NewCounter(prometheus.CounterOpts{ + dropped: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "dropped_total", Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", - }), - queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + }, []string{alertmanagerLabel}), + queueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_length", Help: "The number of alert notifications in the queue.", - }, queueLen), - queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ + }, []string{alertmanagerLabel}), + queueCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_capacity", Help: "The capacity of the alert notifications queue.", - }), + }, []string{alertmanagerLabel}), alertmanagersDiscovered: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_notifications_alertmanagers_discovered", Help: "The number of alertmanagers discovered and active.", }, alertmanagersDiscovered), } - m.queueCapacity.Set(float64(queueCap)) - if r != nil { r.MustRegister( m.latency,