From 1fc4203c1982000162e586c3a13bbfb06de08e43 Mon Sep 17 00:00:00 2001 From: Sveinn Date: Mon, 25 Mar 2024 16:44:20 +0000 Subject: [PATCH] Webhook targets refactor and bug fixes (#19275) - old version was unable to retain messages during config reload - old version could not go from memory to disk during reload - new version can batch disk queue entries to single for to reduce I/O load - error logging has been improved, previous version would miss certain errors. - logic for spawning/despawning additional workers has been adjusted to trigger when half capacity is reached, instead of when the log queue becomes full. - old version would json marshall x2 and unmarshal 1x for every log item. Now we only do marshal x1 and then we GetRaw from the store and send it without having to re-marshal. --- cmd/config-current.go | 8 +- internal/logger/target/http/http.go | 466 +++++++++++++++++----------- internal/logger/targets.go | 108 ++++--- internal/store/queuestore.go | 75 +++++ internal/store/store.go | 20 +- 5 files changed, 440 insertions(+), 237 deletions(-) diff --git a/cmd/config-current.go b/cmd/config-current.go index d84d823bf..c344fadbf 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -620,13 +620,13 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf userAgent := getUserAgent(getMinioMode()) for n, l := range loggerCfg.HTTP { if l.Enabled { - l.LogOnce = logger.LogOnceConsoleIf + l.LogOnceIf = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) } loggerCfg.HTTP[n] = l } - if errs := logger.UpdateSystemTargets(ctx, loggerCfg); len(errs) > 0 { + if errs := logger.UpdateHTTPWebhooks(ctx, loggerCfg.HTTP); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs)) } case config.AuditWebhookSubSys: @@ -637,14 +637,14 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf userAgent := getUserAgent(getMinioMode()) for n, l := range loggerCfg.AuditWebhook { if l.Enabled { - l.LogOnce = logger.LogOnceConsoleIf + l.LogOnceIf = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) } loggerCfg.AuditWebhook[n] = l } - if errs := logger.UpdateAuditWebhookTargets(ctx, loggerCfg); len(errs) > 0 { + if errs := logger.UpdateAuditWebhooks(ctx, loggerCfg.AuditWebhook); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs)) } case config.AuditKafkaSubSys: diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 9dcc24f2e..8efa14d8e 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -20,11 +20,8 @@ package http import ( "bytes" "context" - "encoding/json" "errors" "fmt" - "math" - "math/rand" "net/http" "net/url" "os" @@ -63,6 +60,11 @@ const ( statusClosed ) +var ( + logChBuffers = make(map[string]chan interface{}) + logChLock = sync.Mutex{} +) + // Config http logger target type Config struct { Enabled bool `json:"enabled"` @@ -79,7 +81,7 @@ type Config struct { Transport http.RoundTripper `json:"-"` // Custom logger - LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` + LogOnceIf func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` } // Target implements logger.Target and sends the json @@ -93,9 +95,10 @@ type Target struct { status int32 // Worker control - workers int64 - workerStartMu sync.Mutex - lastStarted time.Time + workers int64 + maxWorkers int64 + // workerStartMu sync.Mutex + lastStarted time.Time wg sync.WaitGroup @@ -105,23 +108,29 @@ type Target struct { logCh chan interface{} logChMu sync.RWMutex + // If this webhook is being re-configured we will + // assign the new webhook target to this field. + // The Send() method will then re-direct entries + // to the new target when the current one + // has been set to status "statusClosed". + // Once the glogal target slice has been migrated + // the current target will stop receiving entries. + migrateTarget *Target + // Number of events per HTTP send to webhook target // this is ideally useful only if your endpoint can // support reading multiple events on a stream for example // like : Splunk HTTP Event collector, if you are unsure // set this to '1'. - batchSize int - - // If the first init fails, this starts a goroutine that - // will attempt to establish the connection. - revive sync.Once + batchSize int + payloadType string // store to persist and replay the logs to the target // to avoid missing events when the target is down. store store.Store[interface{}] storeCtxCancel context.CancelFunc - initQueueStoreOnce once.Init + initQueueOnce once.Init config Config client *http.Client @@ -132,6 +141,11 @@ func (h *Target) Name() string { return "minio-http-" + h.config.Name } +// Type - returns type of the target +func (h *Target) Type() types.TargetType { + return types.TargetHTTP +} + // Endpoint returns the backend endpoint func (h *Target) Endpoint() string { return h.config.Endpoint.String() @@ -146,19 +160,6 @@ func (h *Target) IsOnline(ctx context.Context) bool { return atomic.LoadInt32(&h.status) == statusOnline } -// ping returns true if the target is reachable. -func (h *Target) ping(ctx context.Context) bool { - if err := h.send(ctx, []byte(`{}`), "application/json", webhookCallTimeout); err != nil { - return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) - } - // We are online. - h.workerStartMu.Lock() - h.lastStarted = time.Now() - h.workerStartMu.Unlock() - go h.startHTTPLogger(ctx) - return true -} - // Stats returns the target statistics. func (h *Target) Stats() types.TargetStats { h.logChMu.RLock() @@ -173,56 +174,34 @@ func (h *Target) Stats() types.TargetStats { return stats } +// AssignMigrateTarget assigns a target +// which will eventually replace the current target. +func (h *Target) AssignMigrateTarget(migrateTgt *Target) { + h.migrateTarget = migrateTgt +} + // Init validate and initialize the http target func (h *Target) Init(ctx context.Context) (err error) { if h.config.QueueDir != "" { - return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore) + return h.initQueueOnce.DoWithContext(ctx, h.initDiskStore) } - return h.init(ctx) + return h.initQueueOnce.DoWithContext(ctx, h.initMemoryStore) } -func (h *Target) initQueueStore(ctx context.Context) (err error) { - var queueStore store.Store[interface{}] - queueDir := filepath.Join(h.config.QueueDir, h.Name()) - queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.config.QueueSize), httpLoggerExtension) - if err = queueStore.Open(); err != nil { - return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) - } +func (h *Target) initDiskStore(ctx context.Context) (err error) { ctx, cancel := context.WithCancel(ctx) - h.store = queueStore h.storeCtxCancel = cancel - store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnce) - return + h.lastStarted = time.Now() + go h.startQueueProcessor(ctx, true) + store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnceIf) + return nil } -func (h *Target) init(ctx context.Context) (err error) { - switch atomic.LoadInt32(&h.status) { - case statusOnline: - return nil - case statusClosed: - return errors.New("target is closed") - } - - if !h.ping(ctx) { - // Start a goroutine that will continue to check if we can reach - h.revive.Do(func() { - go func() { - // Avoid stamping herd, add jitter. - t := time.NewTicker(time.Second + time.Duration(rand.Int63n(int64(5*time.Second)))) - defer t.Stop() - - for range t.C { - if atomic.LoadInt32(&h.status) != statusOffline { - return - } - if h.ping(ctx) { - return - } - } - }() - }) - return err - } +func (h *Target) initMemoryStore(ctx context.Context) (err error) { + ctx, cancel := context.WithCancel(ctx) + h.storeCtxCancel = cancel + h.lastStarted = time.Now() + go h.startQueueProcessor(ctx, true) return nil } @@ -275,90 +254,244 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadType string, t } } -func (h *Target) logEntry(ctx context.Context, payload []byte, payloadType string) { - const maxTries = 3 - tries := 0 - for tries < maxTries { - if atomic.LoadInt32(&h.status) == statusClosed { - // Don't retry when closing... - return - } - // sleep = (tries+2) ^ 2 milliseconds. - sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond - if sleep > time.Second { - sleep = time.Second - } - time.Sleep(sleep) - tries++ - err := h.send(ctx, payload, payloadType, webhookCallTimeout) - if err == nil { - return - } - h.config.LogOnce(ctx, err, h.Endpoint()) +func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { + h.logChMu.RLock() + if h.logCh == nil { + h.logChMu.RUnlock() + return } - if tries == maxTries { - // Even with multiple retries, count failed messages as only one. - atomic.AddInt64(&h.failedMessages, 1) - } -} + h.logChMu.RUnlock() -func (h *Target) startHTTPLogger(ctx context.Context) { atomic.AddInt64(&h.workers, 1) defer atomic.AddInt64(&h.workers, -1) - h.logChMu.RLock() - logCh := h.logCh - if logCh != nil { - // We are not allowed to add when logCh is nil - h.wg.Add(1) - defer h.wg.Done() - } - h.logChMu.RUnlock() - if logCh == nil { - return - } + h.wg.Add(1) + defer h.wg.Done() + + entries := make([]interface{}, 0) + name := h.Name() + + defer func() { + // re-load the global buffer pointer + // in case it was modified by a new target. + logChLock.Lock() + currentGlobalBuffer, ok := logChBuffers[name] + logChLock.Unlock() + if !ok { + return + } + + for _, v := range entries { + select { + case currentGlobalBuffer <- v: + default: + } + } + + if mainWorker { + drain: + for { + select { + case v, ok := <-h.logCh: + if !ok { + break drain + } + + currentGlobalBuffer <- v + default: + break drain + } + } + } + }() + + var entry interface{} + var ok bool + var err error + lastBatchProcess := time.Now() buf := bytebufferpool.Get() + enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) defer bytebufferpool.Put(buf) - json := jsoniter.ConfigCompatibleWithStandardLibrary - enc := json.NewEncoder(buf) - batchSize := h.batchSize - if batchSize <= 0 { - batchSize = 1 + isDirQueue := false + if h.config.QueueDir != "" { + isDirQueue = true } - payloadType := "application/json" - if batchSize > 1 { - payloadType = "" - } + // globalBuffer is always created or adjusted + // before this method is launched. + logChLock.Lock() + globalBuffer := logChBuffers[name] + logChLock.Unlock() - var nevents int - // Send messages until channel is closed. - for entry := range logCh { - atomic.AddInt64(&h.totalMessages, 1) - nevents++ - if err := enc.Encode(&entry); err != nil { - atomic.AddInt64(&h.failedMessages, 1) - nevents-- - continue + newTicker := time.NewTicker(time.Second) + isTick := false + + for { + isTick = false + select { + case _ = <-newTicker.C: + isTick = true + case entry, _ = <-globalBuffer: + case entry, ok = <-h.logCh: + if !ok { + return + } + case <-ctx.Done(): + return } - if (nevents == batchSize || len(logCh) == 0) && buf.Len() > 0 { - h.logEntry(ctx, buf.Bytes(), payloadType) + + if !isTick { + atomic.AddInt64(&h.totalMessages, 1) + + if !isDirQueue { + if err := enc.Encode(&entry); err != nil { + h.config.LogOnceIf( + ctx, + fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), + h.Name(), + ) + atomic.AddInt64(&h.failedMessages, 1) + continue + } + } + + entries = append(entries, entry) + } + + if len(entries) != h.batchSize { + if len(h.logCh) > 0 || len(globalBuffer) > 0 || len(entries) == 0 { + continue + } + + if h.batchSize > 1 { + // If we are doing batching, we should wait + // at least one second before sending. + // Even if there is nothing in the queue. + if time.Since(lastBatchProcess).Seconds() < 1 { + continue + } + } + } + + lastBatchProcess = time.Now() + + retry: + // If the channel reaches above half capacity + // we spawn more workers. The workers spawned + // from this main worker routine will exit + // once the channel drops below half capacity + // and when it's been at least 30 seconds since + // we launched a new worker. + if mainWorker && len(h.logCh) > cap(h.logCh)/2 { + nWorkers := atomic.LoadInt64(&h.workers) + if nWorkers < h.maxWorkers { + if time.Since(h.lastStarted).Milliseconds() > 10 { + h.lastStarted = time.Now() + go h.startQueueProcessor(ctx, false) + } + } + } + + if !isDirQueue { + err = h.send(ctx, buf.Bytes(), h.payloadType, webhookCallTimeout) + } else { + err = h.store.PutMultiple(entries) + } + + if err != nil { + + h.config.LogOnceIf( + context.Background(), + fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", name, err), + name, + ) + + if errors.Is(err, context.Canceled) { + return + } + + time.Sleep(3 * time.Second) + goto retry + } + + entries = make([]interface{}, 0) + + if !isDirQueue { buf.Reset() - nevents = 0 + } + + if !mainWorker && len(h.logCh) < cap(h.logCh)/2 { + if time.Since(h.lastStarted).Seconds() > 30 { + return + } + } + + } +} + +// CreateOrAdjustGlobalBuffer will create or adjust the global log entry buffers +// which are used to migrate log entries between old and new targets. +func CreateOrAdjustGlobalBuffer(currentTgt *Target, newTgt *Target) { + logChLock.Lock() + defer logChLock.Unlock() + + requiredCap := currentTgt.config.QueueSize + (currentTgt.config.BatchSize * int(currentTgt.maxWorkers)) + currentCap := 0 + name := newTgt.Name() + + currentBuff, ok := logChBuffers[name] + if !ok { + logChBuffers[name] = make(chan interface{}, requiredCap) + currentCap = requiredCap + } else { + currentCap = cap(currentBuff) + requiredCap += len(currentBuff) + } + + if requiredCap > currentCap { + logChBuffers[name] = make(chan interface{}, requiredCap) + + if len(currentBuff) > 0 { + drain: + for { + select { + case v, ok := <-currentBuff: + if !ok { + break drain + } + logChBuffers[newTgt.Name()] <- v + default: + break drain + } + } } } } // New initializes a new logger target which // sends log over http to the specified endpoint -func New(config Config) *Target { +func New(config Config) (*Target, error) { + maxWorkers := maxWorkers + if config.BatchSize > 100 { + maxWorkers = maxWorkersWithBatchEvents + } else if config.BatchSize <= 0 { + config.BatchSize = 1 + } + h := &Target{ - logCh: make(chan interface{}, config.QueueSize), - config: config, - status: statusOffline, - batchSize: config.BatchSize, + logCh: make(chan interface{}, config.QueueSize), + config: config, + status: statusOffline, + batchSize: config.BatchSize, + maxWorkers: int64(maxWorkers), + } + + if config.BatchSize > 1 { + h.payloadType = "" + } else { + h.payloadType = "application/json" } // If proxy available, set the same @@ -369,32 +502,41 @@ func New(config Config) *Target { ctransport.Proxy = http.ProxyURL(proxyURL) h.config.Transport = ctransport } + h.client = &http.Client{Transport: h.config.Transport} - return h + if h.config.QueueDir != "" { + + queueStore := store.NewQueueStore[interface{}]( + filepath.Join(h.config.QueueDir, h.Name()), + uint64(h.config.QueueSize), + httpLoggerExtension, + ) + + if err := queueStore.Open(); err != nil { + return h, fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) + } + + h.store = queueStore + + } + + return h, nil } // SendFromStore - reads the log from store and sends it to webhook. func (h *Target) SendFromStore(key store.Key) (err error) { - var eventData interface{} - eventData, err = h.store.Get(key.Name) + var eventData []byte + eventData, err = h.store.GetRaw(key.Name) if err != nil { if os.IsNotExist(err) { return nil } return err } - atomic.AddInt64(&h.totalMessages, 1) - logJSON, err := json.Marshal(&eventData) - if err != nil { + + if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { atomic.AddInt64(&h.failedMessages, 1) - return - } - if err := h.send(context.Background(), logJSON, "application/json", webhookCallTimeout); err != nil { - atomic.AddInt64(&h.failedMessages, 1) - if xnet.IsNetworkOrHostDown(err, true) { - return store.ErrNotConnected - } return err } // Delete the event from store. @@ -406,12 +548,12 @@ func (h *Target) SendFromStore(key store.Key) (err error) { // If Cancel has been called the message is ignored. func (h *Target) Send(ctx context.Context, entry interface{}) error { if atomic.LoadInt32(&h.status) == statusClosed { + if h.migrateTarget != nil { + return h.migrateTarget.Send(ctx, entry) + } return nil } - if h.store != nil { - // save the entry to the queue store which will be replayed to the target. - return h.store.Put(entry) - } + h.logChMu.RLock() defer h.logChMu.RUnlock() if h.logCh == nil { @@ -419,11 +561,6 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { return nil } - mworkers := maxWorkers - if h.batchSize > 100 { - mworkers = maxWorkersWithBatchEvents - } - retry: select { case h.logCh <- entry: @@ -435,16 +572,7 @@ retry: } return nil default: - nWorkers := atomic.LoadInt64(&h.workers) - if nWorkers < int64(mworkers) { - // Only have one try to start at the same time. - h.workerStartMu.Lock() - if time.Since(h.lastStarted) > time.Second { - h.lastStarted = time.Now() - go h.startHTTPLogger(ctx) - } - h.workerStartMu.Unlock() - + if h.workers < h.maxWorkers { goto retry } atomic.AddInt64(&h.totalMessages, 1) @@ -460,12 +588,10 @@ retry: // All messages sent to the target after this function has been called will be dropped. func (h *Target) Cancel() { atomic.StoreInt32(&h.status, statusClosed) + h.storeCtxCancel() - // If queuestore is configured, cancel it's context to - // stop the replay go-routine. - if h.store != nil { - h.storeCtxCancel() - } + // Wait for messages to be sent... + h.wg.Wait() // Set logch to nil and close it. // This will block all Send operations, @@ -475,12 +601,4 @@ func (h *Target) Cancel() { xioutil.SafeClose(h.logCh) h.logCh = nil h.logChMu.Unlock() - - // Wait for messages to be sent... - h.wg.Wait() -} - -// Type - returns type of the target -func (h *Target) Type() types.TargetType { - return types.TargetHTTP } diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 893e06664..be66479a6 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -44,13 +44,18 @@ type Target interface { } var ( - swapAuditMuRW sync.RWMutex - swapSystemMuRW sync.RWMutex // systemTargets is the set of enabled loggers. // Must be immutable at all times. // Can be swapped to another while holding swapMu - systemTargets = []Target{} + systemTargets = []Target{} + swapSystemMuRW sync.RWMutex + + // auditTargets is the list of enabled audit loggers + // Must be immutable at all times. + // Can be swapped to another while holding swapMu + auditTargets = []Target{} + swapAuditMuRW sync.RWMutex // This is always set represent /dev/console target consoleTgt Target @@ -103,13 +108,6 @@ func CurrentStats() map[string]types.TargetStats { return res } -// auditTargets is the list of enabled audit loggers -// Must be immutable at all times. -// Can be swapped to another while holding swapMu -var ( - auditTargets = []Target{} -) - // AddSystemTarget adds a new logger target to the // list of enabled loggers func AddSystemTarget(ctx context.Context, t Target) error { @@ -132,23 +130,6 @@ func AddSystemTarget(ctx context.Context, t Target) error { return nil } -func initSystemTargets(ctx context.Context, cfgMap map[string]http.Config) ([]Target, []error) { - tgts := []Target{} - errs := []error{} - for _, l := range cfgMap { - if l.Enabled { - t := http.New(l) - tgts = append(tgts, t) - - e := t.Init(ctx) - if e != nil { - errs = append(errs, e) - } - } - } - return tgts, errs -} - func initKafkaTargets(ctx context.Context, cfgMap map[string]kafka.Config) ([]Target, []error) { tgts := []Target{} errs := []error{} @@ -183,36 +164,67 @@ func splitTargets(targets []Target, t types.TargetType) (group1 []Target, group2 func cancelTargets(targets []Target) { for _, target := range targets { - target.Cancel() + go target.Cancel() } } -// UpdateSystemTargets swaps targets with newly loaded ones from the cfg -func UpdateSystemTargets(ctx context.Context, cfg Config) []error { - newTgts, errs := initSystemTargets(ctx, cfg.HTTP) - - swapSystemMuRW.Lock() - consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole) - newTgts = append(newTgts, consoleTargets...) - systemTargets = newTgts - swapSystemMuRW.Unlock() - - cancelTargets(otherTargets) // cancel running targets - return errs +// UpdateHTTPWebhooks swaps system webhook targets with newly loaded ones from the cfg +func UpdateHTTPWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) { + return updateHTTPTargets(ctx, cfgs, &systemTargets) } -// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg -func UpdateAuditWebhookTargets(ctx context.Context, cfg Config) []error { - newWebhookTgts, errs := initSystemTargets(ctx, cfg.AuditWebhook) +// UpdateAuditWebhooks swaps audit webhook targets with newly loaded ones from the cfg +func UpdateAuditWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) { + return updateHTTPTargets(ctx, cfgs, &auditTargets) +} + +func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetList *[]Target) (errs []error) { + tgts := make([]*http.Target, 0) + newWebhooks := make([]Target, 0) + for _, cfg := range cfgs { + if cfg.Enabled { + t, err := http.New(cfg) + if err != nil { + errs = append(errs, err) + } + tgts = append(tgts, t) + newWebhooks = append(newWebhooks, t) + } + } + + oldTargets := make([]Target, len(*targetList)) + copy(oldTargets, *targetList) + + for i := range oldTargets { + currentTgt, ok := oldTargets[i].(*http.Target) + if !ok { + continue + } + var newTgt *http.Target + + for ii := range tgts { + if currentTgt.Name() == tgts[ii].Name() { + newTgt = tgts[ii] + currentTgt.AssignMigrateTarget(newTgt) + http.CreateOrAdjustGlobalBuffer(currentTgt, newTgt) + break + } + } + } + + for _, t := range tgts { + err := t.Init(ctx) + if err != nil { + errs = append(errs, err) + } + } swapAuditMuRW.Lock() - // Retain kafka targets - oldWebhookTgts, otherTgts := splitTargets(auditTargets, types.TargetHTTP) - newWebhookTgts = append(newWebhookTgts, otherTgts...) - auditTargets = newWebhookTgts + *targetList = newWebhooks swapAuditMuRW.Unlock() - cancelTargets(oldWebhookTgts) // cancel running targets + cancelTargets(oldTargets) + return errs } diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index a9aa79bfc..5f5e3f129 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -28,6 +28,8 @@ import ( "time" "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + "github.com/valyala/bytebufferpool" ) const ( @@ -84,6 +86,7 @@ func (store *QueueStore[_]) Open() error { if uint64(len(files)) > store.entryLimit { files = files[:store.entryLimit] } + for _, file := range files { if file.IsDir() { continue @@ -97,6 +100,54 @@ func (store *QueueStore[_]) Open() error { return nil } +// Delete - Remove the store directory from disk +func (store *QueueStore[_]) Delete() error { + return os.Remove(store.directory) +} + +// PutMultiple - puts an item to the store. +func (store *QueueStore[I]) PutMultiple(item []I) error { + store.Lock() + defer store.Unlock() + if uint64(len(store.entries)) >= store.entryLimit { + return errLimitExceeded + } + // Generate a new UUID for the key. + key, err := uuid.NewRandom() + if err != nil { + return err + } + return store.multiWrite(key.String(), item) +} + +// multiWrite - writes an item to the directory. +func (store *QueueStore[I]) multiWrite(key string, item []I) error { + buf := bytebufferpool.Get() + defer bytebufferpool.Put(buf) + + enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) + + for i := range item { + err := enc.Encode(item[i]) + if err != nil { + return err + } + } + b := buf.Bytes() + + path := filepath.Join(store.directory, key+store.fileExt) + err := os.WriteFile(path, b, os.FileMode(0o770)) + buf.Reset() + if err != nil { + return err + } + + // Increment the item count. + store.entries[key] = time.Now().UnixNano() + + return nil +} + // write - writes an item to the directory. func (store *QueueStore[I]) write(key string, item I) error { // Marshalls the item. @@ -131,6 +182,30 @@ func (store *QueueStore[I]) Put(item I) error { return store.write(key.String(), item) } +// GetRaw - gets an item from the store. +func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { + store.RLock() + + defer func(store *QueueStore[I]) { + store.RUnlock() + if err != nil { + // Upon error we remove the entry. + store.Del(key) + } + }(store) + + raw, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) + if err != nil { + return + } + + if len(raw) == 0 { + return raw, os.ErrNotExist + } + + return +} + // Get - gets an item from the store. func (store *QueueStore[I]) Get(key string) (item I, err error) { store.RLock() diff --git a/internal/store/store.go b/internal/store/store.go index 4f5b6a60e..a72721856 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -25,7 +25,6 @@ import ( "time" xioutil "github.com/minio/minio/internal/ioutil" - xnet "github.com/minio/pkg/v2/net" ) const ( @@ -46,12 +45,15 @@ type Target interface { // Store - Used to persist items. type Store[I any] interface { Put(item I) error + PutMultiple(item []I) error Get(key string) (I, error) + GetRaw(key string) ([]byte, error) Len() int List() ([]string, error) Del(key string) error DelList(key []string) error Open() error + Delete() error Extension() string } @@ -110,15 +112,14 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l break } - if err != ErrNotConnected && !xnet.IsConnResetErr(err) { - logger(context.Background(), - fmt.Errorf("target.SendFromStore() failed with '%w'", err), - target.Name()) - } - - // Retrying after 3secs back-off + logger( + context.Background(), + fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", target.Name(), err), + target.Name(), + ) select { + // Retrying after 3secs back-off case <-retryTicker.C: case <-doneCh: return false @@ -131,7 +132,6 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l select { case key, ok := <-keyCh: if !ok { - // closed channel. return } @@ -147,9 +147,7 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l // StreamItems reads the keys from the store and replays the corresponding item to the target. func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger) { go func() { - // Replays the items from the store. keyCh := replayItems(store, doneCh, logger, target.Name()) - // Send items from the store. sendItems(target, keyCh, doneCh, logger) }() }