feat(notifier): independent alertmanager queues

Independent Alertmanager queues avoid issues with queue overflowing when
one or more Alertmanager instances are unavailable which could result in
lost alert notifications.
The buffered queues are managed per AlertmanagerSet which are dynamically
added/removed with service discovery or configuration reload.

The following metrics now include an extra dimention for alertmanager label:
- prometheus_notifications_dropped_total
- prometheus_notifications_queue_capacity
- prometheus_notifications_queue_length

This change also includes the test from #14099

Closes #7676

Signed-off-by: Siavash Safi <siavash@cloudflare.com>
This commit is contained in:
Siavash Safi 2025-06-10 11:58:07 +02:00
parent 333c0001e2
commit 2d5d239883
No known key found for this signature in database
GPG Key ID: A9D52AC4EA59D44D
8 changed files with 845 additions and 370 deletions

View File

@ -68,11 +68,26 @@ func (a *Alert) ResolvedAt(ts time.Time) bool {
return !a.EndsAt.After(ts) return !a.EndsAt.After(ts)
} }
// Copy returns a copy of the alert.
func (a *Alert) Copy() *Alert {
return &Alert{
Labels: a.Labels.Copy(),
Annotations: a.Annotations.Copy(),
StartsAt: a.StartsAt,
EndsAt: a.EndsAt,
GeneratorURL: a.GeneratorURL,
}
}
func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert { func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert {
lb := labels.NewBuilder(labels.EmptyLabels()) lb := labels.NewBuilder(labels.EmptyLabels())
var relabeledAlerts []*Alert var relabeledAlerts []*Alert
for _, a := range alerts { for _, s := range alerts {
// Copy the alert to avoid race condition between multiple alertmanagersets
// holding references to the same alerts.
a := s.Copy()
lb.Reset(a.Labels) lb.Reset(a.Labels)
externalLabels.Range(func(l labels.Label) { externalLabels.Range(func(l labels.Label) {
if a.Labels.Get(l.Name) == "" { if a.Labels.Get(l.Name) == "" {

View File

@ -14,11 +14,17 @@
package notifier package notifier
import ( import (
"bytes"
"context"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog" "log/slog"
"net/http" "net/http"
"sync" "sync"
"time"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/sigv4" "github.com/prometheus/sigv4"
@ -26,6 +32,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
) )
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service // alertmanagerSet contains a set of Alertmanagers discovered via a group of service
@ -33,16 +40,18 @@ import (
type alertmanagerSet struct { type alertmanagerSet struct {
cfg *config.AlertmanagerConfig cfg *config.AlertmanagerConfig
client *http.Client client *http.Client
opts *Options
metrics *alertMetrics metrics *alertMetrics
mtx sync.RWMutex mtx sync.RWMutex
ams []alertmanager ams []alertmanager
droppedAms []alertmanager droppedAms []alertmanager
buffers map[string]*buffer
logger *slog.Logger logger *slog.Logger
} }
func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { func newAlertmanagerSet(cfg *config.AlertmanagerConfig, opts *Options, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) {
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager")
if err != nil { if err != nil {
return nil, err return nil, err
@ -61,6 +70,8 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, met
s := &alertmanagerSet{ s := &alertmanagerSet{
client: client, client: client,
cfg: cfg, cfg: cfg,
opts: opts,
buffers: make(map[string]*buffer),
logger: logger, logger: logger,
metrics: metrics, metrics: metrics,
} }
@ -98,24 +109,31 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) {
continue continue
} }
// This will initialize the Counters for the AM to 0. // This will initialize the Counters for the AM to 0 and set the static queue capacity gauge.
s.metrics.sent.WithLabelValues(us) s.metrics.dropped.WithLabelValues(us)
s.metrics.errors.WithLabelValues(us) s.metrics.errors.WithLabelValues(us)
s.metrics.sent.WithLabelValues(us)
s.metrics.queueCapacity.WithLabelValues(us).Set(float64(s.opts.QueueCapacity))
seen[us] = struct{}{} seen[us] = struct{}{}
s.ams = append(s.ams, am) s.ams = append(s.ams, am)
} }
s.startSendLoops(allAms)
// Now remove counters for any removed Alertmanagers. // Now remove counters for any removed Alertmanagers.
for _, am := range previousAms { for _, am := range previousAms {
us := am.url().String() us := am.url().String()
if _, ok := seen[us]; ok { if _, ok := seen[us]; ok {
continue continue
} }
s.metrics.latency.DeleteLabelValues(us) s.metrics.dropped.DeleteLabelValues(us)
s.metrics.sent.DeleteLabelValues(us)
s.metrics.errors.DeleteLabelValues(us) s.metrics.errors.DeleteLabelValues(us)
s.metrics.latency.DeleteLabelValues(us)
s.metrics.queueLength.DeleteLabelValues(us)
s.metrics.sent.DeleteLabelValues(us)
seen[us] = struct{}{} seen[us] = struct{}{}
} }
s.cleanSendLoops(previousAms)
} }
func (s *alertmanagerSet) configHash() (string, error) { func (s *alertmanagerSet) configHash() (string, error) {
@ -126,3 +144,149 @@ func (s *alertmanagerSet) configHash() (string, error) {
hash := md5.Sum(b) hash := md5.Sum(b)
return hex.EncodeToString(hash[:]), nil return hex.EncodeToString(hash[:]), nil
} }
func (s *alertmanagerSet) send(alerts ...*Alert) map[string]int {
dropped := make(map[string]int)
if len(s.cfg.AlertRelabelConfigs) > 0 {
alerts = relabelAlerts(s.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(alerts) == 0 {
return dropped
}
}
for am, q := range s.buffers {
d := q.push(alerts...)
dropped[am] += d
}
return dropped
}
// startSendLoops create buffers for newly discovered alertmanager and
// starts a send loop for each.
// This function expects the caller to acquire needed locks.
func (s *alertmanagerSet) startSendLoops(all []alertmanager) {
for _, am := range all {
us := am.url().String()
// create new buffers and start send loops for new alertmanagers in the set.
if _, ok := s.buffers[us]; !ok {
s.buffers[us] = newBuffer(s.opts.QueueCapacity)
go s.sendLoop(am)
}
}
}
// stopSendLoops stops the send loops for each removed alertmanager by
// closing and removing their respective buffers.
// This function expects the caller to acquire needed locks.
func (s *alertmanagerSet) cleanSendLoops(removed []alertmanager) {
for _, am := range removed {
us := am.url().String()
s.buffers[us].close()
delete(s.buffers, us)
}
}
func (s *alertmanagerSet) sendLoop(am alertmanager) {
url := am.url().String()
// allocate an alerts buffer for alerts with length and capacity equal to max batch size.
alerts := make([]*Alert, s.opts.MaxBatchSize)
for {
b := s.getBuffer(url)
if b == nil {
return
}
_, ok := <-b.hasWork
if !ok {
return
}
b.pop(&alerts)
if !s.postNotifications(am, alerts) {
s.metrics.dropped.WithLabelValues(url).Add(float64(len(alerts)))
}
}
}
func (s *alertmanagerSet) postNotifications(am alertmanager, alerts []*Alert) bool {
if len(alerts) == 0 {
return true
}
begin := time.Now()
var payload []byte
var err error
switch s.cfg.APIVersion {
case config.AlertmanagerAPIVersionV2:
{
openAPIAlerts := alertsToOpenAPIAlerts(alerts)
payload, err = json.Marshal(openAPIAlerts)
if err != nil {
s.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err)
return false
}
}
default:
{
s.logger.Error(
fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", s.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions),
"err", err,
)
return false
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.cfg.Timeout))
defer cancel()
url := am.url().String()
if err := s.sendOne(ctx, s.client, url, payload); err != nil {
s.logger.Error("Error sending alerts", "alertmanager", url, "count", len(alerts), "err", err)
s.metrics.errors.WithLabelValues(url).Add(float64(len(alerts)))
return false
}
s.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
s.metrics.sent.WithLabelValues(url).Add(float64(len(alerts)))
return true
}
func (s *alertmanagerSet) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := s.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %s", resp.Status)
}
return nil
}
func (s *alertmanagerSet) getBuffer(url string) *buffer {
s.mtx.RLock()
defer s.mtx.RUnlock()
if q, ok := s.buffers[url]; ok {
return q
}
return nil
}

View File

@ -0,0 +1,53 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"bytes"
"context"
"io"
"net/http"
"testing"
"github.com/stretchr/testify/require"
)
func TestCustomDo(t *testing.T) {
const testURL = "http://testurl.com/"
const testBody = "testbody"
var received bool
h := alertmanagerSet{
opts: &Options{
Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
received = true
body, err := io.ReadAll(req.Body)
require.NoError(t, err)
require.Equal(t, testBody, string(body))
require.Equal(t, testURL, req.URL.String())
return &http.Response{
Body: io.NopCloser(bytes.NewBuffer(nil)),
}, nil
},
},
}
h.sendOne(context.Background(), nil, testURL, []byte(testBody))
require.True(t, received, "Expected to receive an alert, but didn't")
}

124
notifier/buffer.go Normal file
View File

@ -0,0 +1,124 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"sync"
"go.uber.org/atomic"
)
// buffer is a circular buffer for Alerts.
type buffer struct {
mtx sync.RWMutex
data []*Alert
size int
count int
readPointer int
writePointer int
hasWork chan struct{}
done chan struct{}
closed atomic.Bool
}
func newBuffer(size int) *buffer {
return &buffer{
data: make([]*Alert, size),
size: size,
hasWork: make(chan struct{}, 1),
done: make(chan struct{}, 1),
}
}
func (b *buffer) push(alerts ...*Alert) (dropped int) {
b.mtx.Lock()
defer b.mtx.Unlock()
for _, a := range alerts {
if b.count == b.size {
b.readPointer = (b.readPointer + 1) % b.size
dropped++
} else {
b.count++
}
b.data[b.writePointer] = a
b.writePointer = (b.writePointer + 1) % b.size
}
// If the buffer still has items left, kick off the next iteration.
if b.count > 0 {
b.notifyWork()
}
return
}
// pop will move alerts from the buffer into the passed slice.
// Number of moved alerts = min (alerts in buffer and passed slice length).
// The silce length will be dynamically adjusted.
func (b *buffer) pop(alerts *[]*Alert) {
b.mtx.Lock()
defer b.mtx.Unlock()
if b.count == 0 {
// Empty alerts from any cached data.
*alerts = (*alerts)[:0]
return
}
count := min(b.count, cap(*alerts))
*alerts = (*alerts)[0:count]
for i := range count {
(*alerts)[i] = b.data[b.readPointer]
b.data[b.readPointer] = nil
b.readPointer = (b.readPointer + 1) % b.size
b.count--
}
// If the buffer still has items left, kick off the next iteration.
if b.count > 0 {
b.notifyWork()
}
}
func (b *buffer) len() int {
b.mtx.RLock()
defer b.mtx.RUnlock()
return b.count
}
func (b *buffer) notifyWork() {
if b.isClosed() {
return
}
// Attempt to send a signal on the 'hasWork' channel if no signal is pending.
select {
case b.hasWork <- struct{}{}:
case <-b.done:
close(b.hasWork)
default:
// No action needed if the channel already has a pending signal.
}
}
func (b *buffer) close() {
b.done <- struct{}{}
b.closed.Store(true)
}
func (b *buffer) isClosed() bool {
return b.closed.Load()
}

130
notifier/buffer_test.go Normal file
View File

@ -0,0 +1,130 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
func TestPushAlertsToBuffer(t *testing.T) {
alert1 := &Alert{Labels: labels.FromStrings("alertname", "existing1")}
alert2 := &Alert{Labels: labels.FromStrings("alertname", "existing2")}
// Initialize a buffer with capacity 5 and 2 existing alerts
b := newBuffer(5)
b.push(alert1, alert2)
require.Equal(t, []*Alert{alert1, alert2, nil, nil, nil}, b.data)
require.Equal(t, 2, b.len(), "Expected buffer length of 2")
alert3 := &Alert{Labels: labels.FromStrings("alertname", "new1")}
alert4 := &Alert{Labels: labels.FromStrings("alertname", "new2")}
// Push new alerts to buffer, expect 0 dropped
require.Zero(t, b.push(alert3, alert4), "Expected 0 dropped alerts")
// Verify all new alerts were added to the buffer
require.Equal(t, []*Alert{alert1, alert2, alert3, alert4, nil}, b.data)
require.Equal(t, 4, b.len(), "Expected buffer length of 4")
}
// Pushing alerts exceeding buffer capacity should drop oldest alerts.
func TestPushAlertsToBufferExceedingCapacity(t *testing.T) {
alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")}
alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")}
// Initialize a buffer with capacity 3
b := newBuffer(3)
b.push(alert1, alert2)
alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")}
alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")}
// Push new alerts to buffer, expect 1 dropped
require.Equal(t, 1, b.push(alert3, alert4), "Expected 1 dropped alerts")
// Verify all new alerts were added to the buffer, alert4 will be at the beginning of buffer, overwritten alert1
require.Equal(t, []*Alert{alert4, alert2, alert3}, b.data, "Expected 3 alerts in the buffer")
}
// Pushing alerts exceeding total buffer capacity should drop alerts from both old and new.
func TestPushAlertsToBufferExceedingTotalCapacity(t *testing.T) {
alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")}
alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")}
// Initialize a buffer with capacity 3
b := newBuffer(3)
b.push(alert1, alert2)
alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")}
alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")}
alert5 := &Alert{Labels: labels.FromStrings("alertname", "alert5")}
alert6 := &Alert{Labels: labels.FromStrings("alertname", "alert6")}
// Push new alerts to buffer, expect 3 dropped: 1 from new batch + 2 from existing bufferd items
require.Equal(t, 3, b.push(alert3, alert4, alert5, alert6), "Expected 3 dropped alerts")
// Verify all new alerts were added to the buffer
require.Equal(t, []*Alert{alert4, alert5, alert6}, b.data, "Expected 3 alerts in the buffer")
}
func TestPopAlertsFromBuffer(t *testing.T) {
// Initialize a buffer with capacity 5
b := newBuffer(5)
alert1 := &Alert{Labels: labels.FromStrings("alertname", "alert1")}
alert2 := &Alert{Labels: labels.FromStrings("alertname", "alert2")}
alert3 := &Alert{Labels: labels.FromStrings("alertname", "alert3")}
b.push(alert1, alert2, alert3)
// Test 3 alerts in the buffer
result1 := make([]*Alert, 3)
b.pop(&result1)
require.Equal(t, []*Alert{alert1, alert2, alert3}, result1, "Expected all 3 alerts")
require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements")
require.Zero(t, b.len(), "Expected buffer length of 0")
b.pop(&result1)
require.Empty(t, result1, "Expected pop to return empty slice")
// Test full buffer
alert4 := &Alert{Labels: labels.FromStrings("alertname", "alert4")}
alert5 := &Alert{Labels: labels.FromStrings("alertname", "alert5")}
b.push(alert1, alert2, alert3, alert4, alert5)
result2 := make([]*Alert, 5)
b.pop(&result2)
require.Equal(t, []*Alert{alert1, alert2, alert3, alert4, alert5}, result2, "Expected all 5 alerts")
require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements")
require.Zero(t, b.len(), "Expected buffer length of 0")
b.pop(&result2)
require.Empty(t, result2, "Expected pop to return empty slice")
// Test smaller max size than capacity
b.push(alert1, alert2, alert3, alert4, alert5)
result3 := make([]*Alert, 3)
b.pop(&result3)
require.Equal(t, []*Alert{alert1, alert2, alert3}, result3, "Expected 3 first alerts from buffer")
require.Equal(t, 2, b.len(), "Expected buffer length of 2")
// Pop the remaining 2 alerts in buffer
result4 := make([]*Alert, 3)
b.pop(&result4)
require.Equal(t, []*Alert{alert4, alert5}, result4, "Expected 2 last alerts from buffer")
require.Equal(t, []*Alert{nil, nil, nil, nil, nil}, b.data, "Expected buffer with nil elements")
require.Zero(t, b.len(), "Expected buffer length of 0")
b.pop(&result4)
require.Empty(t, result4, "Expected pop to return empty slice")
}

View File

@ -14,11 +14,8 @@
package notifier package notifier
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"net/http" "net/http"
"net/url" "net/url"
@ -54,13 +51,11 @@ var userAgent = version.PrometheusUserAgent()
// Manager is responsible for dispatching alert notifications to an // Manager is responsible for dispatching alert notifications to an
// alert manager service. // alert manager service.
type Manager struct { type Manager struct {
queue []*Alert opts *Options
opts *Options
metrics *alertMetrics metrics *alertMetrics
more chan struct{} mtx sync.RWMutex
mtx sync.RWMutex
stopOnce *sync.Once stopOnce *sync.Once
stopRequested chan struct{} stopRequested chan struct{}
@ -105,23 +100,15 @@ func NewManager(o *Options, logger *slog.Logger) *Manager {
} }
n := &Manager{ n := &Manager{
queue: make([]*Alert, 0, o.QueueCapacity),
more: make(chan struct{}, 1),
stopRequested: make(chan struct{}), stopRequested: make(chan struct{}),
stopOnce: &sync.Once{}, stopOnce: &sync.Once{},
opts: o, opts: o,
logger: logger, logger: logger,
} }
queueLenFunc := func() float64 { return float64(n.queueLen()) }
alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) } alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) }
n.metrics = newAlertMetrics( n.metrics = newAlertMetrics(o.Registerer, alertmanagersDiscoveredFunc)
o.Registerer,
o.QueueCapacity,
queueLenFunc,
alertmanagersDiscoveredFunc,
)
return n return n
} }
@ -147,7 +134,7 @@ func (n *Manager) ApplyConfig(conf *config.Config) error {
} }
for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() { for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() {
ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics) ams, err := newAlertmanagerSet(cfg, n.opts, n.logger, n.metrics)
if err != nil { if err != nil {
return err return err
} }
@ -170,77 +157,15 @@ func (n *Manager) ApplyConfig(conf *config.Config) error {
return nil return nil
} }
func (n *Manager) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
return len(n.queue)
}
func (n *Manager) nextBatch() []*Alert {
n.mtx.Lock()
defer n.mtx.Unlock()
var alerts []*Alert
if maxBatchSize := n.opts.MaxBatchSize; len(n.queue) > maxBatchSize {
alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:]
} else {
alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
}
return alerts
}
// Run dispatches notifications continuously, returning once Stop has been called and all // Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled). // pending notifications have been drained from the queue (if draining is enabled).
// //
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other. // Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. // Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
wg := sync.WaitGroup{} n.targetUpdateLoop(tsets)
wg.Add(2) <-n.stopRequested
n.drainQueue()
go func() {
defer wg.Done()
n.targetUpdateLoop(tsets)
}()
go func() {
defer wg.Done()
n.sendLoop()
n.drainQueue()
}()
wg.Wait()
n.logger.Info("Notification manager stopped")
}
// sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers.
func (n *Manager) sendLoop() {
for {
// If we've been asked to stop, that takes priority over sending any further notifications.
select {
case <-n.stopRequested:
return
default:
select {
case <-n.stopRequested:
return
case <-n.more:
n.sendOneBatch()
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
n.setMore()
}
}
}
}
} }
// targetUpdateLoop receives updates of target groups and triggers a reload. // targetUpdateLoop receives updates of target groups and triggers a reload.
@ -261,31 +186,40 @@ func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group)
} }
} }
func (n *Manager) sendOneBatch() {
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
n.metrics.dropped.Add(float64(len(alerts)))
}
}
func (n *Manager) drainQueue() { func (n *Manager) drainQueue() {
if !n.opts.DrainOnShutdown { if !n.opts.DrainOnShutdown {
if n.queueLen() > 0 { for _, ams := range n.alertmanagers {
n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) for am, b := range ams.buffers {
n.metrics.dropped.Add(float64(n.queueLen())) n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "alertmanager", am, "count", b.len())
n.metrics.dropped.WithLabelValues(am).Add(float64(b.len()))
b.close()
}
} }
return return
} }
n.logger.Info("Draining any remaining notifications...") n.logger.Info("Draining any remaining notifications...")
for n.queueLen() > 0 { drained := false
n.sendOneBatch() for !drained {
remain := false
for _, ams := range n.alertmanagers {
for am, b := range ams.buffers {
if b.len() > 0 {
remain = true
n.logger.Info("Remaining notifications to drain", "alertmanager", am, "count", b.len())
}
}
}
drained = !remain
time.Sleep(100 * time.Millisecond)
}
n.logger.Info("Remaining notifications drained, stopping send loops")
for _, ams := range n.alertmanagers {
for _, b := range ams.buffers {
b.close()
}
} }
n.logger.Info("Remaining notifications drained")
} }
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
@ -305,44 +239,23 @@ func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
// Send queues the given notification requests for processing. // Send queues the given notification requests for processing.
// Panics if called on a handler that is not running. // Panics if called on a handler that is not running.
func (n *Manager) Send(alerts ...*Alert) { func (n *Manager) Send(alerts ...*Alert) {
n.mtx.Lock() n.mtx.RLock()
defer n.mtx.Unlock() defer n.mtx.RUnlock()
alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts) alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts)
if len(alerts) == 0 { if len(alerts) == 0 {
return return
} }
// Queue capacity should be significantly larger than a single alert for _, ams := range n.alertmanagers {
// batch could be. dropped := ams.send(alerts...)
if d := len(alerts) - n.opts.QueueCapacity; d > 0 { for am, count := range dropped {
alerts = alerts[d:] n.logger.Warn("Notification queue is full, and some old notifications have been dropped", "alertmanager", am, "count", count)
n.metrics.dropped.WithLabelValues(am).Add(float64(count))
n.logger.Warn("Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) }
n.metrics.dropped.Add(float64(d)) for am, q := range ams.buffers {
} n.metrics.queueLength.WithLabelValues(am).Set(float64(q.len()))
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
n.logger.Warn("Alert notification queue full, dropping alerts", "num_dropped", d)
n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore()
}
// setMore signals that the alert queue has items.
func (n *Manager) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.more <- struct{}{}:
default:
} }
} }
@ -384,151 +297,6 @@ func (n *Manager) DroppedAlertmanagers() []*url.URL {
return res return res
} }
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Manager) sendAll(alerts ...*Alert) bool {
if len(alerts) == 0 {
return true
}
begin := time.Now()
// cachedPayload represent 'alerts' marshaled for Alertmanager API v2.
// Marshaling happens below. Reference here is for caching between
// for loop iterations.
var cachedPayload []byte
n.mtx.RLock()
amSets := n.alertmanagers
n.mtx.RUnlock()
var (
wg sync.WaitGroup
amSetCovered sync.Map
)
for k, ams := range amSets {
var (
payload []byte
err error
amAlerts = alerts
)
ams.mtx.RLock()
if len(ams.ams) == 0 {
ams.mtx.RUnlock()
continue
}
if len(ams.cfg.AlertRelabelConfigs) > 0 {
amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(amAlerts) == 0 {
ams.mtx.RUnlock()
continue
}
// We can't use the cached values from previous iteration.
cachedPayload = nil
}
switch ams.cfg.APIVersion {
case config.AlertmanagerAPIVersionV2:
{
if cachedPayload == nil {
openAPIAlerts := alertsToOpenAPIAlerts(amAlerts)
cachedPayload, err = json.Marshal(openAPIAlerts)
if err != nil {
n.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err)
ams.mtx.RUnlock()
return false
}
}
payload = cachedPayload
}
default:
{
n.logger.Error(
fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions),
"err", err,
)
ams.mtx.RUnlock()
return false
}
}
if len(ams.cfg.AlertRelabelConfigs) > 0 {
// We can't use the cached values on the next iteration.
cachedPayload = nil
}
// Being here means len(ams.ams) > 0
amSetCovered.Store(k, false)
for _, am := range ams.ams {
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout))
defer cancel()
go func(ctx context.Context, k string, client *http.Client, url string, payload []byte, count int) {
err := n.sendOne(ctx, client, url, payload)
if err != nil {
n.logger.Error("Error sending alerts", "alertmanager", url, "count", count, "err", err)
n.metrics.errors.WithLabelValues(url).Add(float64(count))
} else {
amSetCovered.CompareAndSwap(k, false, true)
}
n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
n.metrics.sent.WithLabelValues(url).Add(float64(count))
wg.Done()
}(ctx, k, ams.client, am.url().String(), payload, len(amAlerts))
}
ams.mtx.RUnlock()
}
wg.Wait()
// Return false if there are any sets which were attempted (e.g. not filtered
// out) but have no successes.
allAmSetsCovered := true
amSetCovered.Range(func(_, value any) bool {
if !value.(bool) {
allAmSetsCovered = false
return false
}
return true
})
return allAmSetsCovered
}
func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %s", resp.Status)
}
return nil
}
// Stop signals the notification manager to shut down and immediately returns. // Stop signals the notification manager to shut down and immediately returns.
// //
// Run will return once the notification manager has successfully shut down. // Run will return once the notification manager has successfully shut down.

View File

@ -14,11 +14,11 @@
package notifier package notifier
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log/slog"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
@ -27,6 +27,7 @@ import (
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/common/promslog" "github.com/prometheus/common/promslog"
@ -44,21 +45,42 @@ import (
const maxBatchSize = 256 const maxBatchSize = 256
func TestHandlerNextBatch(t *testing.T) { func TestHandlerSendBatch(t *testing.T) {
h := NewManager(&Options{}, nil) h := NewManager(&Options{}, nil)
b := newBuffer(10_000)
h.alertmanagers = map[string]*alertmanagerSet{
"mock": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return "http://mock" },
},
},
cfg: &config.DefaultAlertmanagerConfig,
buffers: map[string]*buffer{"http://mock": b},
},
}
var alerts []*Alert
for i := range make([]struct{}, 2*maxBatchSize+1) { for i := range make([]struct{}, 2*maxBatchSize+1) {
h.queue = append(h.queue, &Alert{ alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", strconv.Itoa(i)), Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
}) })
} }
h.Send(alerts...)
expected := append([]*Alert{}, h.queue...) expected := append([]*Alert{}, alerts...)
require.NoError(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch())) batch := make([]*Alert, maxBatchSize)
require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch()))
require.NoError(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch())) b.pop(&batch)
require.Empty(t, h.queue, "Expected queue to be empty but got %d alerts", len(h.queue)) require.NoError(t, alertsEqual(expected[0:maxBatchSize], batch))
b.pop(&batch)
require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], batch))
b.pop(&batch)
require.NoError(t, alertsEqual(expected[2*maxBatchSize:], batch))
} }
func alertsEqual(a, b []*Alert) error { func alertsEqual(a, b []*Alert) error {
@ -108,11 +130,21 @@ func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string
})) }))
} }
func getCounterValue(t *testing.T, metric *prometheus.CounterVec, labels ...string) float64 {
t.Helper()
m := &dto.Metric{}
if err := metric.WithLabelValues(labels...).Write(m); err != nil {
t.Fatal(err)
}
return m.Counter.GetValue()
}
func TestHandlerSendAll(t *testing.T) { func TestHandlerSendAll(t *testing.T) {
var ( var (
errc = make(chan error, 1) errc = make(chan error, 1)
expected = make([]*Alert, 0, maxBatchSize) expected = make([]*Alert, 0)
status1, status2, status3 atomic.Int32 status1, status2, status3 atomic.Int32
errors1, errors2, errors3 float64
) )
status1.Store(int32(http.StatusOK)) status1.Store(int32(http.StatusOK))
status2.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK))
@ -146,14 +178,21 @@ func TestHandlerSendAll(t *testing.T) {
am3Cfg := config.DefaultAlertmanagerConfig am3Cfg := config.DefaultAlertmanagerConfig
am3Cfg.Timeout = model.Duration(time.Second) am3Cfg.Timeout = model.Duration(time.Second)
opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: maxBatchSize}
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
h.alertmanagers["1"] = &alertmanagerSet{ h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{ ams: []alertmanager{
alertmanagerMock{ alertmanagerMock{
urlf: func() string { return server1.URL }, urlf: func() string { return server1.URL },
}, },
}, },
cfg: &am1Cfg, cfg: &am1Cfg,
client: authClient, client: authClient,
buffers: map[string]*buffer{server1.URL: newBuffer(opts.QueueCapacity)},
opts: opts,
metrics: h.metrics,
logger: logger,
} }
h.alertmanagers["2"] = &alertmanagerSet{ h.alertmanagers["2"] = &alertmanagerSet{
@ -166,15 +205,27 @@ func TestHandlerSendAll(t *testing.T) {
}, },
}, },
cfg: &am2Cfg, cfg: &am2Cfg,
buffers: map[string]*buffer{
server2.URL: newBuffer(opts.QueueCapacity),
server3.URL: newBuffer(opts.QueueCapacity),
},
opts: opts,
metrics: h.metrics,
logger: logger,
} }
h.alertmanagers["3"] = &alertmanagerSet{ h.alertmanagers["3"] = &alertmanagerSet{
ams: []alertmanager{}, // empty set ams: []alertmanager{}, // empty set
cfg: &am3Cfg, cfg: &am3Cfg,
buffers: make(map[string]*buffer),
opts: opts,
metrics: h.metrics,
logger: logger,
} }
var alerts []*Alert
for i := range make([]struct{}, maxBatchSize) { for i := range make([]struct{}, maxBatchSize) {
h.queue = append(h.queue, &Alert{ alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", strconv.Itoa(i)), Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
}) })
expected = append(expected, &Alert{ expected = append(expected, &Alert{
@ -191,37 +242,88 @@ func TestHandlerSendAll(t *testing.T) {
} }
} }
// start send loops
for _, ams := range h.alertmanagers {
for _, am := range ams.ams {
go ams.sendLoop(am)
}
}
// all ams in all sets are up // all ams in all sets are up
require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") h.Send(alerts...)
time.Sleep(time.Second)
// snapshot error metrics and check them
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.Zero(t, errors1, "server1 has unexpected send errors")
require.Zero(t, errors2, "server2 has unexpected send errors")
require.Zero(t, errors3, "server3 has unexpected send errors")
checkNoErr() checkNoErr()
// the only am in set 1 is down // the only am in set 1 is down
status1.Store(int32(http.StatusNotFound)) status1.Store(int32(http.StatusNotFound))
require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") h.Send(alerts...)
time.Sleep(time.Second)
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.NotZero(t, errors1, "server1 has no send errors")
require.Zero(t, errors2, "server2 has unexpected send errors")
require.Zero(t, errors3, "server3 has unexpected send errors")
checkNoErr() checkNoErr()
// reset it // reset it
status1.Store(int32(http.StatusOK)) status1.Store(int32(http.StatusOK))
// reset metrics
h.metrics.errors.Reset()
// only one of the ams in set 2 is down // only one of the ams in set 2 is down
status2.Store(int32(http.StatusInternalServerError)) status2.Store(int32(http.StatusInternalServerError))
require.True(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") h.Send(alerts...)
time.Sleep(time.Second)
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.Zero(t, errors1, "server1 has unexpected send errors")
require.NotZero(t, errors2, "server2 has no send errors")
require.Zero(t, errors3, "server3 has unexpected send errors")
checkNoErr() checkNoErr()
// both ams in set 2 are down // both ams in set 2 are down
status3.Store(int32(http.StatusInternalServerError)) status3.Store(int32(http.StatusInternalServerError))
require.False(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") h.Send(alerts...)
time.Sleep(time.Second)
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.Zero(t, errors1, "server1 has unexpected send errors")
require.NotZero(t, errors2, "server2 has no send errors")
require.NotZero(t, errors3, "server3 has no send errors")
checkNoErr() checkNoErr()
// stop send routines by closing buffers
for _, ams := range h.alertmanagers {
for _, q := range ams.buffers {
q.close()
}
}
} }
func TestHandlerSendAllRemapPerAm(t *testing.T) { func TestHandlerSendAllRemapPerAm(t *testing.T) {
var ( var (
errc = make(chan error, 1) errc = make(chan error, 1)
expected1 = make([]*Alert, 0, maxBatchSize) expected1 = make([]*Alert, 0)
expected2 = make([]*Alert, 0, maxBatchSize) expected2 = make([]*Alert, 0)
expected3 = make([]*Alert, 0) expected3 = make([]*Alert, 0)
status1, status2, status3 atomic.Int32 status1, status2, status3 atomic.Int32
errors1, errors2, errors3 float64
) )
status1.Store(int32(http.StatusOK)) status1.Store(int32(http.StatusOK))
status2.Store(int32(http.StatusOK)) status2.Store(int32(http.StatusOK))
@ -261,6 +363,9 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
}, },
} }
opts := &Options{Do: do, QueueCapacity: 10_000, MaxBatchSize: maxBatchSize}
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
h.alertmanagers = map[string]*alertmanagerSet{ h.alertmanagers = map[string]*alertmanagerSet{
// Drop no alerts. // Drop no alerts.
"1": { "1": {
@ -269,7 +374,11 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
urlf: func() string { return server1.URL }, urlf: func() string { return server1.URL },
}, },
}, },
cfg: &am1Cfg, cfg: &am1Cfg,
buffers: map[string]*buffer{server1.URL: newBuffer(opts.QueueCapacity)},
opts: opts,
metrics: h.metrics,
logger: logger,
}, },
// Drop only alerts with the "alertnamedrop" label. // Drop only alerts with the "alertnamedrop" label.
"2": { "2": {
@ -278,7 +387,11 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
urlf: func() string { return server2.URL }, urlf: func() string { return server2.URL },
}, },
}, },
cfg: &am2Cfg, cfg: &am2Cfg,
buffers: map[string]*buffer{server2.URL: newBuffer(opts.QueueCapacity)},
opts: opts,
metrics: h.metrics,
logger: logger,
}, },
// Drop all alerts. // Drop all alerts.
"3": { "3": {
@ -287,17 +400,26 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
urlf: func() string { return server3.URL }, urlf: func() string { return server3.URL },
}, },
}, },
cfg: &am3Cfg, cfg: &am3Cfg,
buffers: map[string]*buffer{server3.URL: newBuffer(opts.QueueCapacity)},
opts: opts,
metrics: h.metrics,
logger: logger,
}, },
// Empty list of Alertmanager endpoints. // Empty list of Alertmanager endpoints.
"4": { "4": {
ams: []alertmanager{}, ams: []alertmanager{},
cfg: &config.DefaultAlertmanagerConfig, cfg: &config.DefaultAlertmanagerConfig,
buffers: make(map[string]*buffer),
opts: opts,
metrics: h.metrics,
logger: logger,
}, },
} }
var alerts []*Alert
for i := range make([]struct{}, maxBatchSize/2) { for i := range make([]struct{}, maxBatchSize/2) {
h.queue = append(h.queue, alerts = append(alerts,
&Alert{ &Alert{
Labels: labels.FromStrings("alertname", strconv.Itoa(i)), Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
}, },
@ -328,24 +450,66 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
} }
} }
// start send loops
for _, ams := range h.alertmanagers {
for _, am := range ams.ams {
go ams.sendLoop(am)
}
}
// all ams are up // all ams are up
require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") h.Send(alerts...)
time.Sleep(time.Second)
// snapshot error metrics and check them
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.Zero(t, errors1, "server1 has unexpected send errors")
require.Zero(t, errors2, "server2 has unexpected send errors")
require.Zero(t, errors3, "server3 has unexpected send errors")
checkNoErr() checkNoErr()
// the only am in set 1 goes down // the only am in set 1 goes down
status1.Store(int32(http.StatusInternalServerError)) status1.Store(int32(http.StatusInternalServerError))
require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") h.Send(alerts...)
time.Sleep(time.Second)
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.NotZero(t, errors1, "server1 has no send errors")
require.Zero(t, errors2, "server2 has unexpected send errors")
require.Zero(t, errors3, "server3 has unexpected send errors")
checkNoErr() checkNoErr()
// reset set 1 // reset set 1
status1.Store(int32(http.StatusOK)) status1.Store(int32(http.StatusOK))
// reset metrics
h.metrics.errors.Reset()
// set 3 loses its only am, but all alerts were dropped // set 3 loses its only am, but all alerts were dropped
// so there was nothing to send, keeping sendAll true // so there was nothing to send, keeping sendAll true
status3.Store(int32(http.StatusInternalServerError)) status3.Store(int32(http.StatusInternalServerError))
require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") h.Send(alerts...)
time.Sleep(3 * time.Second)
errors1 = getCounterValue(t, h.metrics.errors, server1.URL)
errors2 = getCounterValue(t, h.metrics.errors, server2.URL)
errors3 = getCounterValue(t, h.metrics.errors, server3.URL)
require.Zero(t, errors1, "server1 has unexpected send errors")
require.Zero(t, errors2, "server2 has unexpected send errors")
require.Zero(t, errors3, "server3 has unexpected send errors")
checkNoErr() checkNoErr()
// stop send routines by closing buffers
for _, ams := range h.alertmanagers {
for _, q := range ams.buffers {
q.close()
}
}
// Verify that individual locks are released. // Verify that individual locks are released.
for k := range h.alertmanagers { for k := range h.alertmanagers {
h.alertmanagers[k].mtx.Lock() h.alertmanagers[k].mtx.Lock()
@ -354,33 +518,6 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
} }
} }
func TestCustomDo(t *testing.T) {
const testURL = "http://testurl.com/"
const testBody = "testbody"
var received bool
h := NewManager(&Options{
Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
received = true
body, err := io.ReadAll(req.Body)
require.NoError(t, err)
require.Equal(t, testBody, string(body))
require.Equal(t, testURL, req.URL.String())
return &http.Response{
Body: io.NopCloser(bytes.NewBuffer(nil)),
}, nil
},
}, nil)
h.sendOne(context.Background(), nil, testURL, []byte(testBody))
require.True(t, received, "Expected to receive an alert, but didn't")
}
func TestExternalLabels(t *testing.T) { func TestExternalLabels(t *testing.T) {
h := NewManager(&Options{ h := NewManager(&Options{
QueueCapacity: 3 * maxBatchSize, QueueCapacity: 3 * maxBatchSize,
@ -397,6 +534,16 @@ func TestExternalLabels(t *testing.T) {
}, },
}, nil) }, nil)
queue := newBuffer(h.opts.QueueCapacity)
h.alertmanagers = map[string]*alertmanagerSet{
"test": {
buffers: map[string]*buffer{"test": queue},
cfg: &config.AlertmanagerConfig{
RelabelConfigs: h.opts.RelabelConfigs,
},
},
}
// This alert should get the external label attached. // This alert should get the external label attached.
h.Send(&Alert{ h.Send(&Alert{
Labels: labels.FromStrings("alertname", "test"), Labels: labels.FromStrings("alertname", "test"),
@ -408,12 +555,15 @@ func TestExternalLabels(t *testing.T) {
Labels: labels.FromStrings("alertname", "externalrelabelthis"), Labels: labels.FromStrings("alertname", "externalrelabelthis"),
}) })
alerts := make([]*Alert, maxBatchSize)
queue.pop(&alerts)
expected := []*Alert{ expected := []*Alert{
{Labels: labels.FromStrings("alertname", "test", "a", "b")}, {Labels: labels.FromStrings("alertname", "test", "a", "b")},
{Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")}, {Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")},
} }
require.NoError(t, alertsEqual(expected, h.queue)) require.NoError(t, alertsEqual(expected, alerts))
} }
func TestHandlerRelabel(t *testing.T) { func TestHandlerRelabel(t *testing.T) {
@ -436,6 +586,16 @@ func TestHandlerRelabel(t *testing.T) {
}, },
}, nil) }, nil)
queue := newBuffer(h.opts.QueueCapacity)
h.alertmanagers = map[string]*alertmanagerSet{
"test": {
buffers: map[string]*buffer{"test": queue},
cfg: &config.AlertmanagerConfig{
RelabelConfigs: h.opts.RelabelConfigs,
},
},
}
// This alert should be dropped due to the configuration // This alert should be dropped due to the configuration
h.Send(&Alert{ h.Send(&Alert{
Labels: labels.FromStrings("alertname", "drop"), Labels: labels.FromStrings("alertname", "drop"),
@ -446,11 +606,14 @@ func TestHandlerRelabel(t *testing.T) {
Labels: labels.FromStrings("alertname", "rename"), Labels: labels.FromStrings("alertname", "rename"),
}) })
alerts := make([]*Alert, maxBatchSize)
queue.pop(&alerts)
expected := []*Alert{ expected := []*Alert{
{Labels: labels.FromStrings("alertname", "renamed")}, {Labels: labels.FromStrings("alertname", "renamed")},
} }
require.NoError(t, alertsEqual(expected, h.queue)) require.NoError(t, alertsEqual(expected, alerts))
} }
func TestHandlerQueuing(t *testing.T) { func TestHandlerQueuing(t *testing.T) {
@ -514,8 +677,19 @@ func TestHandlerQueuing(t *testing.T) {
urlf: func() string { return server.URL }, urlf: func() string { return server.URL },
}, },
}, },
cfg: &am1Cfg, cfg: &am1Cfg,
buffers: map[string]*buffer{server.URL: newBuffer(h.opts.QueueCapacity)},
metrics: h.metrics,
opts: &Options{Do: do, MaxBatchSize: maxBatchSize},
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
} }
for _, ams := range h.alertmanagers {
for _, am := range ams.ams {
go ams.sendLoop(am)
}
}
go h.Run(nil) go h.Run(nil)
defer h.Stop() defer h.Stop()
@ -706,6 +880,7 @@ func makeInputTargetGroup() *targetgroup.Group {
// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are // TestHangingNotifier ensures that the notifier takes into account SD changes even when there are
// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. // queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676.
// and https://github.com/prometheus/prometheus/issues/8768. // and https://github.com/prometheus/prometheus/issues/8768.
// TODO: Drop this test as we have independent queues per alertmanager now.
func TestHangingNotifier(t *testing.T) { func TestHangingNotifier(t *testing.T) {
const ( const (
batches = 100 batches = 100
@ -782,7 +957,20 @@ func TestHangingNotifier(t *testing.T) {
}, },
cfg: &amCfg, cfg: &amCfg,
metrics: notifier.metrics, metrics: notifier.metrics,
buffers: map[string]*buffer{
faultyURL.String(): newBuffer(notifier.opts.QueueCapacity),
functionalURL.String(): newBuffer(notifier.opts.QueueCapacity),
},
opts: &Options{Do: do, MaxBatchSize: maxBatchSize},
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
} }
for _, ams := range notifier.alertmanagers {
for _, am := range ams.ams {
go ams.sendLoop(am)
}
}
go notifier.Run(sdManager.SyncCh()) go notifier.Run(sdManager.SyncCh())
defer notifier.Stop() defer notifier.Stop()
@ -842,10 +1030,13 @@ loop2:
// The faulty alertmanager was dropped. // The faulty alertmanager was dropped.
if len(notifier.Alertmanagers()) == 1 { if len(notifier.Alertmanagers()) == 1 {
// Prevent from TOCTOU. // Prevent from TOCTOU.
require.Positive(t, notifier.queueLen()) for _, ams := range notifier.alertmanagers {
for _, q := range ams.buffers {
require.Zero(t, q.len())
}
}
break loop2 break loop2
} }
require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.")
} }
} }
} }
@ -897,7 +1088,17 @@ func TestStop_DrainingDisabled(t *testing.T) {
urlf: func() string { return server.URL }, urlf: func() string { return server.URL },
}, },
}, },
cfg: &am1Cfg, cfg: &am1Cfg,
buffers: map[string]*buffer{server.URL: newBuffer(m.opts.QueueCapacity)},
opts: &Options{Do: do, MaxBatchSize: maxBatchSize},
metrics: newAlertMetrics(prometheus.DefaultRegisterer, nil),
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
for _, ams := range m.alertmanagers {
for _, am := range ams.ams {
go ams.sendLoop(am)
}
} }
notificationManagerStopped := make(chan struct{}) notificationManagerStopped := make(chan struct{})
@ -933,7 +1134,8 @@ func TestStop_DrainingDisabled(t *testing.T) {
require.FailNow(t, "gave up waiting for notification manager to stop") require.FailNow(t, "gave up waiting for notification manager to stop")
} }
require.Equal(t, int64(1), alertsReceived.Load()) // At least one alert must have been delivered before notification manager stops.
require.Positive(t, alertsReceived.Load())
} }
func TestStop_DrainingEnabled(t *testing.T) { func TestStop_DrainingEnabled(t *testing.T) {
@ -942,9 +1144,6 @@ func TestStop_DrainingEnabled(t *testing.T) {
alertsReceived := atomic.NewInt64(0) alertsReceived := atomic.NewInt64(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}
var alerts []*Alert var alerts []*Alert
b, err := io.ReadAll(r.Body) b, err := io.ReadAll(r.Body)
@ -955,6 +1154,9 @@ func TestStop_DrainingEnabled(t *testing.T) {
alertsReceived.Add(int64(len(alerts))) alertsReceived.Add(int64(len(alerts)))
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}
// Wait for the test to release us. // Wait for the test to release us.
<-releaseReceiver <-releaseReceiver
@ -983,7 +1185,17 @@ func TestStop_DrainingEnabled(t *testing.T) {
urlf: func() string { return server.URL }, urlf: func() string { return server.URL },
}, },
}, },
cfg: &am1Cfg, cfg: &am1Cfg,
buffers: map[string]*buffer{server.URL: newBuffer(m.opts.QueueCapacity)},
opts: &Options{Do: do, MaxBatchSize: maxBatchSize},
metrics: m.metrics,
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
for _, ams := range m.alertmanagers {
for _, am := range ams.ams {
go ams.sendLoop(am)
}
} }
notificationManagerStopped := make(chan struct{}) notificationManagerStopped := make(chan struct{})
@ -1013,10 +1225,11 @@ func TestStop_DrainingEnabled(t *testing.T) {
select { select {
case <-notificationManagerStopped: case <-notificationManagerStopped:
// Nothing more to do. // Nothing more to do.
case <-time.After(200 * time.Millisecond): case <-time.After(400 * time.Millisecond):
require.FailNow(t, "gave up waiting for notification manager to stop") require.FailNow(t, "gave up waiting for notification manager to stop")
} }
<-receiverReceivedRequest
require.Equal(t, int64(2), alertsReceived.Load()) require.Equal(t, int64(2), alertsReceived.Load())
} }
@ -1149,7 +1362,10 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) {
urlf: func() string { return blackHoleAM.URL }, urlf: func() string { return blackHoleAM.URL },
}, },
}, },
cfg: &amCfg, cfg: &amCfg,
opts: &Options{Do: do, MaxBatchSize: maxBatchSize},
buffers: map[string]*buffer{blackHoleAM.URL: newBuffer(10)},
metrics: h.metrics,
} }
h.alertmanagers["2"] = &alertmanagerSet{ h.alertmanagers["2"] = &alertmanagerSet{
@ -1158,16 +1374,23 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) {
urlf: func() string { return immediateAM.URL }, urlf: func() string { return immediateAM.URL },
}, },
}, },
cfg: &amCfg, cfg: &amCfg,
opts: &Options{Do: do, MaxBatchSize: maxBatchSize},
buffers: map[string]*buffer{immediateAM.URL: newBuffer(10)},
metrics: h.metrics,
} }
h.queue = append(h.queue, &Alert{
Labels: labels.FromStrings("alertname", "test"),
})
doneSendAll := make(chan struct{}) doneSendAll := make(chan struct{})
go func() { go func() {
h.sendAll(h.queue...) for _, s := range h.alertmanagers {
for _, am := range s.ams {
go s.sendLoop(am)
}
}
h.Send(&Alert{
Labels: labels.FromStrings("alertname", "test"),
})
close(doneSendAll) close(doneSendAll)
}() }()
@ -1187,7 +1410,7 @@ func TestNotifierQueueIndependentOfFailedAlertmanager(t *testing.T) {
} }
func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server { func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
// Do nothing, wait to be canceled. // Do nothing, wait to be canceled.
<-stop <-stop
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@ -1195,7 +1418,7 @@ func newBlackHoleAlertmanager(stop <-chan struct{}) *httptest.Server {
} }
func newImmediateAlertManager(done chan<- struct{}) *httptest.Server { func newImmediateAlertManager(done chan<- struct{}) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
close(done) close(done)
})) }))

View File

@ -19,13 +19,13 @@ type alertMetrics struct {
latency *prometheus.SummaryVec latency *prometheus.SummaryVec
errors *prometheus.CounterVec errors *prometheus.CounterVec
sent *prometheus.CounterVec sent *prometheus.CounterVec
dropped prometheus.Counter dropped *prometheus.CounterVec
queueLength prometheus.GaugeFunc queueLength *prometheus.GaugeVec
queueCapacity prometheus.Gauge queueCapacity *prometheus.GaugeVec
alertmanagersDiscovered prometheus.GaugeFunc alertmanagersDiscovered prometheus.GaugeFunc
} }
func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanagersDiscovered func() float64) *alertMetrics { func newAlertMetrics(r prometheus.Registerer, alertmanagersDiscovered func() float64) *alertMetrics {
m := &alertMetrics{ m := &alertMetrics{
latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace, Namespace: namespace,
@ -52,32 +52,30 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag
}, },
[]string{alertmanagerLabel}, []string{alertmanagerLabel},
), ),
dropped: prometheus.NewCounter(prometheus.CounterOpts{ dropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "dropped_total", Name: "dropped_total",
Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", Help: "Total number of alerts dropped due to errors when sending to Alertmanager.",
}), }, []string{alertmanagerLabel}),
queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ queueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "queue_length", Name: "queue_length",
Help: "The number of alert notifications in the queue.", Help: "The number of alert notifications in the queue.",
}, queueLen), }, []string{alertmanagerLabel}),
queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ queueCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "queue_capacity", Name: "queue_capacity",
Help: "The capacity of the alert notifications queue.", Help: "The capacity of the alert notifications queue.",
}), }, []string{alertmanagerLabel}),
alertmanagersDiscovered: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ alertmanagersDiscovered: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_notifications_alertmanagers_discovered", Name: "prometheus_notifications_alertmanagers_discovered",
Help: "The number of alertmanagers discovered and active.", Help: "The number of alertmanagers discovered and active.",
}, alertmanagersDiscovered), }, alertmanagersDiscovered),
} }
m.queueCapacity.Set(float64(queueCap))
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
m.latency, m.latency,