From 233cc3905a2ed09372de4374061719d8c6fbdfdf Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 7 Mar 2024 12:17:46 -0800 Subject: [PATCH] add batchSize support for webhook endpoints (#19214) configure batch size to send audit/logger events in batches instead of sending one event per connection. this is mainly to optimize the number of requests we make to webhook endpoint. --- internal/logger/config.go | 34 ++++++++++++- internal/logger/help.go | 12 +++++ internal/logger/target/http/http.go | 78 ++++++++++++++++++++++------- 3 files changed, 105 insertions(+), 19 deletions(-) diff --git a/internal/logger/config.go b/internal/logger/config.go index d5c6ed285..b2c8f3167 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -43,6 +43,7 @@ const ( AuthToken = "auth_token" ClientCert = "client_cert" ClientKey = "client_key" + BatchSize = "batch_size" QueueSize = "queue_size" QueueDir = "queue_dir" Proxy = "proxy" @@ -68,6 +69,7 @@ const ( EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT" EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY" + EnvLoggerWebhookBatchSize = "MINIO_LOGGER_WEBHOOK_BATCH_SIZE" EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR" @@ -76,6 +78,7 @@ const ( EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" + EnvAuditWebhookBatchSize = "MINIO_AUDIT_WEBHOOK_BATCH_SIZE" EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR" @@ -99,7 +102,10 @@ const ( auditTargetNamePrefix = "audit-" ) -var errInvalidQueueSize = errors.New("invalid queue_size value") +var ( + errInvalidQueueSize = errors.New("invalid queue_size value") + errInvalidBatchSize = errors.New("invalid batch_size value") +) // Default KVS for loggerHTTP and loggerAuditHTTP var ( @@ -128,6 +134,10 @@ var ( Key: Proxy, Value: "", }, + config.KV{ + Key: BatchSize, + Value: "1", + }, config.KV{ Key: QueueSize, Value: "100000", @@ -159,6 +169,10 @@ var ( Key: ClientKey, Value: "", }, + config.KV{ + Key: BatchSize, + Value: "1", + }, config.KV{ Key: QueueSize, Value: "100000", @@ -435,6 +449,14 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if queueSize <= 0 { return cfg, errInvalidQueueSize } + batchSizeCfgVal := getCfgVal(EnvLoggerWebhookBatchSize, k, kv.Get(BatchSize)) + batchSize, err := strconv.Atoi(batchSizeCfgVal) + if err != nil { + return cfg, err + } + if batchSize <= 0 { + return cfg, errInvalidBatchSize + } cfg.HTTP[k] = http.Config{ Enabled: true, Endpoint: url, @@ -442,6 +464,7 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: clientCert, ClientKey: clientKey, Proxy: getCfgVal(EnvLoggerWebhookProxy, k, kv.Get(Proxy)), + BatchSize: batchSize, QueueSize: queueSize, QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, k, kv.Get(QueueDir)), Name: loggerTargetNamePrefix + k, @@ -488,12 +511,21 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if queueSize <= 0 { return cfg, errInvalidQueueSize } + batchSizeCfgVal := getCfgVal(EnvAuditWebhookBatchSize, k, kv.Get(BatchSize)) + batchSize, err := strconv.Atoi(batchSizeCfgVal) + if err != nil { + return cfg, err + } + if batchSize <= 0 { + return cfg, errInvalidBatchSize + } cfg.AuditWebhook[k] = http.Config{ Enabled: true, Endpoint: url, AuthToken: getCfgVal(EnvAuditWebhookAuthToken, k, kv.Get(AuthToken)), ClientCert: clientCert, ClientKey: clientKey, + BatchSize: batchSize, QueueSize: queueSize, QueueDir: getCfgVal(EnvAuditWebhookQueueDir, k, kv.Get(QueueDir)), Name: auditTargetNamePrefix + k, diff --git a/internal/logger/help.go b/internal/logger/help.go index 9619cbbbe..b540774e7 100644 --- a/internal/logger/help.go +++ b/internal/logger/help.go @@ -52,6 +52,12 @@ var ( Type: "string", Sensitive: true, }, + config.HelpKV{ + Key: BatchSize, + Description: "Number of events per HTTP send to webhook target", + Optional: true, + Type: "number", + }, config.HelpKV{ Key: QueueSize, Description: "configure channel queue size for webhook targets", @@ -107,6 +113,12 @@ var ( Type: "string", Sensitive: true, }, + config.HelpKV{ + Key: BatchSize, + Description: "Number of events per HTTP send to webhook target", + Optional: true, + Type: "number", + }, config.HelpKV{ Key: QueueSize, Description: "configure channel queue size for webhook targets", diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index e8cefdf84..9dcc24f2e 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2023 MinIO, Inc. +// Copyright (c) 2015-2024 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -33,12 +33,14 @@ import ( "sync/atomic" "time" + jsoniter "github.com/json-iterator/go" xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger/target/types" "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/v2/net" + "github.com/valyala/bytebufferpool" ) const ( @@ -48,6 +50,9 @@ const ( // maxWorkers is the maximum number of concurrent http loggers maxWorkers = 16 + // maxWorkers is the maximum number of concurrent batch http loggers + maxWorkersWithBatchEvents = 4 + // the suffix for the configured queue dir where the logs will be persisted. httpLoggerExtension = ".http.log" ) @@ -67,6 +72,7 @@ type Config struct { AuthToken string `json:"authToken"` ClientCert string `json:"clientCert"` ClientKey string `json:"clientKey"` + BatchSize int `json:"batchSize"` QueueSize int `json:"queueSize"` QueueDir string `json:"queueDir"` Proxy string `json:"string"` @@ -99,6 +105,13 @@ type Target struct { logCh chan interface{} logChMu sync.RWMutex + // Number of events per HTTP send to webhook target + // this is ideally useful only if your endpoint can + // support reading multiple events on a stream for example + // like : Splunk HTTP Event collector, if you are unsure + // set this to '1'. + batchSize int + // If the first init fails, this starts a goroutine that // will attempt to establish the connection. revive sync.Once @@ -135,7 +148,7 @@ func (h *Target) IsOnline(ctx context.Context) bool { // ping returns true if the target is reachable. func (h *Target) ping(ctx context.Context) bool { - if err := h.send(ctx, []byte(`{}`), webhookCallTimeout); err != nil { + if err := h.send(ctx, []byte(`{}`), "application/json", webhookCallTimeout); err != nil { return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) } // We are online. @@ -213,7 +226,7 @@ func (h *Target) init(ctx context.Context) (err error) { return nil } -func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration) (err error) { +func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) { defer func() { if err != nil { atomic.StoreInt32(&h.status, statusOffline) @@ -229,7 +242,9 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration if err != nil { return fmt.Errorf("invalid configuration for '%s'; %v", h.Endpoint(), err) } - req.Header.Set(xhttp.ContentType, "application/json") + if payloadType != "" { + req.Header.Set(xhttp.ContentType, payloadType) + } req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) @@ -260,13 +275,7 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration } } -func (h *Target) logEntry(ctx context.Context, entry interface{}) { - logJSON, err := json.Marshal(&entry) - if err != nil { - atomic.AddInt64(&h.failedMessages, 1) - return - } - +func (h *Target) logEntry(ctx context.Context, payload []byte, payloadType string) { const maxTries = 3 tries := 0 for tries < maxTries { @@ -281,7 +290,7 @@ func (h *Target) logEntry(ctx context.Context, entry interface{}) { } time.Sleep(sleep) tries++ - err := h.send(ctx, logJSON, webhookCallTimeout) + err := h.send(ctx, payload, payloadType, webhookCallTimeout) if err == nil { return } @@ -309,9 +318,36 @@ func (h *Target) startHTTPLogger(ctx context.Context) { return } + buf := bytebufferpool.Get() + defer bytebufferpool.Put(buf) + + json := jsoniter.ConfigCompatibleWithStandardLibrary + enc := json.NewEncoder(buf) + batchSize := h.batchSize + if batchSize <= 0 { + batchSize = 1 + } + + payloadType := "application/json" + if batchSize > 1 { + payloadType = "" + } + + var nevents int // Send messages until channel is closed. for entry := range logCh { - h.logEntry(ctx, entry) + atomic.AddInt64(&h.totalMessages, 1) + nevents++ + if err := enc.Encode(&entry); err != nil { + atomic.AddInt64(&h.failedMessages, 1) + nevents-- + continue + } + if (nevents == batchSize || len(logCh) == 0) && buf.Len() > 0 { + h.logEntry(ctx, buf.Bytes(), payloadType) + buf.Reset() + nevents = 0 + } } } @@ -319,9 +355,10 @@ func (h *Target) startHTTPLogger(ctx context.Context) { // sends log over http to the specified endpoint func New(config Config) *Target { h := &Target{ - logCh: make(chan interface{}, config.QueueSize), - config: config, - status: statusOffline, + logCh: make(chan interface{}, config.QueueSize), + config: config, + status: statusOffline, + batchSize: config.BatchSize, } // If proxy available, set the same @@ -353,7 +390,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { atomic.AddInt64(&h.failedMessages, 1) return } - if err := h.send(context.Background(), logJSON, webhookCallTimeout); err != nil { + if err := h.send(context.Background(), logJSON, "application/json", webhookCallTimeout); err != nil { atomic.AddInt64(&h.failedMessages, 1) if xnet.IsNetworkOrHostDown(err, true) { return store.ErrNotConnected @@ -382,6 +419,11 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { return nil } + mworkers := maxWorkers + if h.batchSize > 100 { + mworkers = maxWorkersWithBatchEvents + } + retry: select { case h.logCh <- entry: @@ -394,7 +436,7 @@ retry: return nil default: nWorkers := atomic.LoadInt64(&h.workers) - if nWorkers < maxWorkers { + if nWorkers < int64(mworkers) { // Only have one try to start at the same time. h.workerStartMu.Lock() if time.Since(h.lastStarted) > time.Second {