prometheus/notifier/alertmanagerset.go
Siavash Safi a89c665f47
feat(notifier): independent alertmanager sendloops (#16355)
* notifier: unit test for dropping throughput on stuck AM

Ref: https://github.com/prometheus/prometheus/issues/7676

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>

* chore(notifier): remove year from copyrights

Signed-off-by: Siavash Safi <siavash@cloudflare.com>

* feat(notifier): independent alertmanager sendloops

Independent Alertmanager sendloops avoid issues with queue overflowing
when one or more Alertmanager instances are unavailable which could
result in lost alert notifications.
The sendloops are managed per AlertmanagerSet which are dynamically
added/removed with service discovery or configuration reload.

The following metrics now include an extra dimention for alertmanager label:
- prometheus_notifications_dropped_total
- prometheus_notifications_queue_capacity
- prometheus_notifications_queue_length

This change also includes the test from #14099

Closes #7676

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>

---------

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Signed-off-by: machine424 <ayoubmrini424@gmail.com>
Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
2026-01-20 10:33:07 +01:00

192 lines
5.1 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"crypto/md5"
"encoding/hex"
"fmt"
"log/slog"
"net/http"
"sync"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/sigv4"
"go.yaml.in/yaml/v2"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
)
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
cfg *config.AlertmanagerConfig
client *http.Client
opts *Options
metrics *alertMetrics
mtx sync.RWMutex
ams []alertmanager
droppedAms []alertmanager
sendLoops map[string]*sendLoop
logger *slog.Logger
}
func newAlertmanagerSet(cfg *config.AlertmanagerConfig, opts *Options, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) {
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager")
if err != nil {
return nil, err
}
t := client.Transport
if cfg.SigV4Config != nil {
t, err = sigv4.NewSigV4RoundTripper(cfg.SigV4Config, client.Transport)
if err != nil {
return nil, err
}
}
client.Transport = t
s := &alertmanagerSet{
client: client,
cfg: cfg,
opts: opts,
sendLoops: make(map[string]*sendLoop),
logger: logger,
metrics: metrics,
}
return s, nil
}
// sync extracts a deduplicated set of Alertmanager endpoints from a list
// of target groups definitions.
func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) {
allAms := []alertmanager{}
allDroppedAms := []alertmanager{}
for _, tg := range tgs {
ams, droppedAms, err := AlertmanagerFromGroup(tg, s.cfg)
if err != nil {
s.logger.Error("Creating discovered Alertmanagers failed", "err", err)
continue
}
allAms = append(allAms, ams...)
allDroppedAms = append(allDroppedAms, droppedAms...)
}
s.mtx.Lock()
defer s.mtx.Unlock()
previousAms := s.ams
s.ams = []alertmanager{}
s.droppedAms = []alertmanager{}
s.droppedAms = append(s.droppedAms, allDroppedAms...)
// Deduplicate Alertmanagers and add sendloops for new Alertmanagers.
seen := map[string]struct{}{}
for _, am := range allAms {
us := am.url().String()
if _, ok := seen[us]; ok {
continue
}
seen[us] = struct{}{}
s.ams = append(s.ams, am)
}
s.addSendLoops(s.ams)
// Populate a list of Alertmanagers to clean up,
// avoid cleaning up what we just added.
for _, am := range previousAms {
us := am.url().String()
if _, ok := seen[us]; ok {
continue
}
seen[us] = struct{}{}
s.cleanSendLoops(am)
}
}
func (s *alertmanagerSet) configHash() (string, error) {
b, err := yaml.Marshal(s.cfg)
if err != nil {
return "", err
}
hash := md5.Sum(b)
return hex.EncodeToString(hash[:]), nil
}
func (s *alertmanagerSet) send(alerts ...*Alert) {
s.mtx.Lock()
defer s.mtx.Unlock()
if len(s.cfg.AlertRelabelConfigs) > 0 {
alerts = relabelAlerts(s.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(alerts) == 0 {
return
}
}
for _, sendLoop := range s.sendLoops {
sendLoop.add(alerts...)
}
}
// addSendLoops creates and starts a send loop for newly discovered alertmanager.
// This function expects the caller to acquire needed locks.
func (s *alertmanagerSet) addSendLoops(ams []alertmanager) {
for _, am := range ams {
us := am.url().String()
// Only add if sendloop doesn't already exist
if loop, exists := s.sendLoops[us]; exists {
loop.logger.Debug("Alertmanager already has send loop running, skipping")
continue
}
sendLoop := newSendLoop(us, s.client, s.cfg, s.opts, s.logger.With("alertmanager", us), s.metrics)
go sendLoop.loop()
s.sendLoops[us] = sendLoop
}
}
// cleanSendLoops stops and cleans the send loops for each removed alertmanager.
// This function expects the caller to acquire needed locks.
func (s *alertmanagerSet) cleanSendLoops(ams ...alertmanager) {
for _, am := range ams {
us := am.url().String()
if sendLoop, ok := s.sendLoops[us]; ok {
sendLoop.stop()
delete(s.sendLoops, us)
}
}
}
// startSendLoops starts a send loop for newly discovered alertmanager.
// This function expects the caller to acquire needed locks.
// This is mainly needed for testing where the loops are added as part of the test setup.
func (s *alertmanagerSet) startSendLoops(ams []alertmanager) {
for _, am := range ams {
us := am.url().String()
if l, ok := s.sendLoops[us]; ok {
go l.loop()
continue
}
panic(fmt.Sprintf("send loop not found for %s", us))
}
}