ruler notifier: make batch size configurable (#16254)

* ruler notifier: make batch size configurable

In Mimir we experimented with setting a higher value for the batch size.
A 4x increase in batch size decreased the time to process a single notification by about 2x.
This reduces the processing time of the notifications queue and increases the throughput of the queue.

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update cmd/prometheus/main.go

Co-authored-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update docs

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Use a string constant

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add godoc comment on exported constant

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
Dimitar Dimitrov 2025-03-24 15:22:19 +01:00 committed by GitHub
parent 475092ff79
commit bd5b2ea95c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 20 additions and 3 deletions

View File

@ -518,6 +518,9 @@ func main() {
serverOnlyFlag(a, "alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity)
serverOnlyFlag(a, "alertmanager.notification-batch-size", "The maximum number of notifications per batch to send to the Alertmanager.").
Default(strconv.Itoa(notifier.DefaultMaxBatchSize)).IntVar(&cfg.notifier.MaxBatchSize)
serverOnlyFlag(a, "alertmanager.drain-notification-queue-on-shutdown", "Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down.").
Default("true").BoolVar(&cfg.notifier.DrainOnShutdown)

View File

@ -55,6 +55,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| <code class="text-nowrap">--alertmanager.notification-batch-size</code> | The maximum number of notifications per batch to send to the Alertmanager. Use with server mode only. | `256` |
| <code class="text-nowrap">--alertmanager.drain-notification-queue-on-shutdown</code> | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` |
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |

View File

@ -45,6 +45,9 @@ import (
)
const (
// DefaultMaxBatchSize is the default maximum number of alerts to send in a single request to the alertmanager.
DefaultMaxBatchSize = 256
contentTypeJSON = "application/json"
)
@ -132,6 +135,9 @@ type Options struct {
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
Registerer prometheus.Registerer
// MaxBatchSize determines the maximum number of alerts to send in a single request to the alertmanager.
MaxBatchSize int
}
type alertMetrics struct {
@ -224,6 +230,10 @@ func NewManager(o *Options, logger *slog.Logger) *Manager {
if o.Do == nil {
o.Do = do
}
// Set default MaxBatchSize if not provided.
if o.MaxBatchSize <= 0 {
o.MaxBatchSize = DefaultMaxBatchSize
}
if logger == nil {
logger = promslog.NewNopLogger()
}
@ -294,8 +304,6 @@ func (n *Manager) ApplyConfig(conf *config.Config) error {
return nil
}
const maxBatchSize = 64
func (n *Manager) queueLen() int {
n.mtx.RLock()
defer n.mtx.RUnlock()
@ -309,7 +317,7 @@ func (n *Manager) nextBatch() []*Alert {
var alerts []*Alert
if len(n.queue) > maxBatchSize {
if maxBatchSize := n.opts.MaxBatchSize; len(n.queue) > maxBatchSize {
alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:]
} else {

View File

@ -43,6 +43,8 @@ import (
"github.com/prometheus/prometheus/model/relabel"
)
const maxBatchSize = 256
func TestPostPath(t *testing.T) {
cases := []struct {
in, out string
@ -413,6 +415,7 @@ func TestCustomDo(t *testing.T) {
func TestExternalLabels(t *testing.T) {
h := NewManager(&Options{
QueueCapacity: 3 * maxBatchSize,
MaxBatchSize: maxBatchSize,
ExternalLabels: labels.FromStrings("a", "b"),
RelabelConfigs: []*relabel.Config{
{
@ -447,6 +450,7 @@ func TestExternalLabels(t *testing.T) {
func TestHandlerRelabel(t *testing.T) {
h := NewManager(&Options{
QueueCapacity: 3 * maxBatchSize,
MaxBatchSize: maxBatchSize,
RelabelConfigs: []*relabel.Config{
{
SourceLabels: model.LabelNames{"alertname"},
@ -525,6 +529,7 @@ func TestHandlerQueuing(t *testing.T) {
h := NewManager(
&Options{
QueueCapacity: 3 * maxBatchSize,
MaxBatchSize: maxBatchSize,
},
nil,
)