diff --git a/cmd/config-current.go b/cmd/config-current.go index d84d823bf..c344fadbf 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -620,13 +620,13 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf userAgent := getUserAgent(getMinioMode()) for n, l := range loggerCfg.HTTP { if l.Enabled { - l.LogOnce = logger.LogOnceConsoleIf + l.LogOnceIf = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) } loggerCfg.HTTP[n] = l } - if errs := logger.UpdateSystemTargets(ctx, loggerCfg); len(errs) > 0 { + if errs := logger.UpdateHTTPWebhooks(ctx, loggerCfg.HTTP); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs)) } case config.AuditWebhookSubSys: @@ -637,14 +637,14 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf userAgent := getUserAgent(getMinioMode()) for n, l := range loggerCfg.AuditWebhook { if l.Enabled { - l.LogOnce = logger.LogOnceConsoleIf + l.LogOnceIf = logger.LogOnceConsoleIf l.UserAgent = userAgent l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey) } loggerCfg.AuditWebhook[n] = l } - if errs := logger.UpdateAuditWebhookTargets(ctx, loggerCfg); len(errs) > 0 { + if errs := logger.UpdateAuditWebhooks(ctx, loggerCfg.AuditWebhook); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs)) } case config.AuditKafkaSubSys: diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 9dcc24f2e..8efa14d8e 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -20,11 +20,8 @@ package http import ( "bytes" "context" - "encoding/json" "errors" "fmt" - "math" - "math/rand" "net/http" "net/url" "os" @@ -63,6 +60,11 @@ const ( statusClosed ) +var ( + logChBuffers = make(map[string]chan interface{}) + logChLock = sync.Mutex{} +) + // Config http logger target type Config struct { Enabled bool `json:"enabled"` @@ -79,7 +81,7 @@ type Config struct { Transport http.RoundTripper `json:"-"` // Custom logger - LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` + LogOnceIf func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` } // Target implements logger.Target and sends the json @@ -93,9 +95,10 @@ type Target struct { status int32 // Worker control - workers int64 - workerStartMu sync.Mutex - lastStarted time.Time + workers int64 + maxWorkers int64 + // workerStartMu sync.Mutex + lastStarted time.Time wg sync.WaitGroup @@ -105,23 +108,29 @@ type Target struct { logCh chan interface{} logChMu sync.RWMutex + // If this webhook is being re-configured we will + // assign the new webhook target to this field. + // The Send() method will then re-direct entries + // to the new target when the current one + // has been set to status "statusClosed". + // Once the glogal target slice has been migrated + // the current target will stop receiving entries. + migrateTarget *Target + // Number of events per HTTP send to webhook target // this is ideally useful only if your endpoint can // support reading multiple events on a stream for example // like : Splunk HTTP Event collector, if you are unsure // set this to '1'. - batchSize int - - // If the first init fails, this starts a goroutine that - // will attempt to establish the connection. - revive sync.Once + batchSize int + payloadType string // store to persist and replay the logs to the target // to avoid missing events when the target is down. store store.Store[interface{}] storeCtxCancel context.CancelFunc - initQueueStoreOnce once.Init + initQueueOnce once.Init config Config client *http.Client @@ -132,6 +141,11 @@ func (h *Target) Name() string { return "minio-http-" + h.config.Name } +// Type - returns type of the target +func (h *Target) Type() types.TargetType { + return types.TargetHTTP +} + // Endpoint returns the backend endpoint func (h *Target) Endpoint() string { return h.config.Endpoint.String() @@ -146,19 +160,6 @@ func (h *Target) IsOnline(ctx context.Context) bool { return atomic.LoadInt32(&h.status) == statusOnline } -// ping returns true if the target is reachable. -func (h *Target) ping(ctx context.Context) bool { - if err := h.send(ctx, []byte(`{}`), "application/json", webhookCallTimeout); err != nil { - return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err) - } - // We are online. - h.workerStartMu.Lock() - h.lastStarted = time.Now() - h.workerStartMu.Unlock() - go h.startHTTPLogger(ctx) - return true -} - // Stats returns the target statistics. func (h *Target) Stats() types.TargetStats { h.logChMu.RLock() @@ -173,56 +174,34 @@ func (h *Target) Stats() types.TargetStats { return stats } +// AssignMigrateTarget assigns a target +// which will eventually replace the current target. +func (h *Target) AssignMigrateTarget(migrateTgt *Target) { + h.migrateTarget = migrateTgt +} + // Init validate and initialize the http target func (h *Target) Init(ctx context.Context) (err error) { if h.config.QueueDir != "" { - return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore) + return h.initQueueOnce.DoWithContext(ctx, h.initDiskStore) } - return h.init(ctx) + return h.initQueueOnce.DoWithContext(ctx, h.initMemoryStore) } -func (h *Target) initQueueStore(ctx context.Context) (err error) { - var queueStore store.Store[interface{}] - queueDir := filepath.Join(h.config.QueueDir, h.Name()) - queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.config.QueueSize), httpLoggerExtension) - if err = queueStore.Open(); err != nil { - return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) - } +func (h *Target) initDiskStore(ctx context.Context) (err error) { ctx, cancel := context.WithCancel(ctx) - h.store = queueStore h.storeCtxCancel = cancel - store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnce) - return + h.lastStarted = time.Now() + go h.startQueueProcessor(ctx, true) + store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnceIf) + return nil } -func (h *Target) init(ctx context.Context) (err error) { - switch atomic.LoadInt32(&h.status) { - case statusOnline: - return nil - case statusClosed: - return errors.New("target is closed") - } - - if !h.ping(ctx) { - // Start a goroutine that will continue to check if we can reach - h.revive.Do(func() { - go func() { - // Avoid stamping herd, add jitter. - t := time.NewTicker(time.Second + time.Duration(rand.Int63n(int64(5*time.Second)))) - defer t.Stop() - - for range t.C { - if atomic.LoadInt32(&h.status) != statusOffline { - return - } - if h.ping(ctx) { - return - } - } - }() - }) - return err - } +func (h *Target) initMemoryStore(ctx context.Context) (err error) { + ctx, cancel := context.WithCancel(ctx) + h.storeCtxCancel = cancel + h.lastStarted = time.Now() + go h.startQueueProcessor(ctx, true) return nil } @@ -275,90 +254,244 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadType string, t } } -func (h *Target) logEntry(ctx context.Context, payload []byte, payloadType string) { - const maxTries = 3 - tries := 0 - for tries < maxTries { - if atomic.LoadInt32(&h.status) == statusClosed { - // Don't retry when closing... - return - } - // sleep = (tries+2) ^ 2 milliseconds. - sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond - if sleep > time.Second { - sleep = time.Second - } - time.Sleep(sleep) - tries++ - err := h.send(ctx, payload, payloadType, webhookCallTimeout) - if err == nil { - return - } - h.config.LogOnce(ctx, err, h.Endpoint()) +func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { + h.logChMu.RLock() + if h.logCh == nil { + h.logChMu.RUnlock() + return } - if tries == maxTries { - // Even with multiple retries, count failed messages as only one. - atomic.AddInt64(&h.failedMessages, 1) - } -} + h.logChMu.RUnlock() -func (h *Target) startHTTPLogger(ctx context.Context) { atomic.AddInt64(&h.workers, 1) defer atomic.AddInt64(&h.workers, -1) - h.logChMu.RLock() - logCh := h.logCh - if logCh != nil { - // We are not allowed to add when logCh is nil - h.wg.Add(1) - defer h.wg.Done() - } - h.logChMu.RUnlock() - if logCh == nil { - return - } + h.wg.Add(1) + defer h.wg.Done() + + entries := make([]interface{}, 0) + name := h.Name() + + defer func() { + // re-load the global buffer pointer + // in case it was modified by a new target. + logChLock.Lock() + currentGlobalBuffer, ok := logChBuffers[name] + logChLock.Unlock() + if !ok { + return + } + + for _, v := range entries { + select { + case currentGlobalBuffer <- v: + default: + } + } + + if mainWorker { + drain: + for { + select { + case v, ok := <-h.logCh: + if !ok { + break drain + } + + currentGlobalBuffer <- v + default: + break drain + } + } + } + }() + + var entry interface{} + var ok bool + var err error + lastBatchProcess := time.Now() buf := bytebufferpool.Get() + enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) defer bytebufferpool.Put(buf) - json := jsoniter.ConfigCompatibleWithStandardLibrary - enc := json.NewEncoder(buf) - batchSize := h.batchSize - if batchSize <= 0 { - batchSize = 1 + isDirQueue := false + if h.config.QueueDir != "" { + isDirQueue = true } - payloadType := "application/json" - if batchSize > 1 { - payloadType = "" - } + // globalBuffer is always created or adjusted + // before this method is launched. + logChLock.Lock() + globalBuffer := logChBuffers[name] + logChLock.Unlock() - var nevents int - // Send messages until channel is closed. - for entry := range logCh { - atomic.AddInt64(&h.totalMessages, 1) - nevents++ - if err := enc.Encode(&entry); err != nil { - atomic.AddInt64(&h.failedMessages, 1) - nevents-- - continue + newTicker := time.NewTicker(time.Second) + isTick := false + + for { + isTick = false + select { + case _ = <-newTicker.C: + isTick = true + case entry, _ = <-globalBuffer: + case entry, ok = <-h.logCh: + if !ok { + return + } + case <-ctx.Done(): + return } - if (nevents == batchSize || len(logCh) == 0) && buf.Len() > 0 { - h.logEntry(ctx, buf.Bytes(), payloadType) + + if !isTick { + atomic.AddInt64(&h.totalMessages, 1) + + if !isDirQueue { + if err := enc.Encode(&entry); err != nil { + h.config.LogOnceIf( + ctx, + fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), + h.Name(), + ) + atomic.AddInt64(&h.failedMessages, 1) + continue + } + } + + entries = append(entries, entry) + } + + if len(entries) != h.batchSize { + if len(h.logCh) > 0 || len(globalBuffer) > 0 || len(entries) == 0 { + continue + } + + if h.batchSize > 1 { + // If we are doing batching, we should wait + // at least one second before sending. + // Even if there is nothing in the queue. + if time.Since(lastBatchProcess).Seconds() < 1 { + continue + } + } + } + + lastBatchProcess = time.Now() + + retry: + // If the channel reaches above half capacity + // we spawn more workers. The workers spawned + // from this main worker routine will exit + // once the channel drops below half capacity + // and when it's been at least 30 seconds since + // we launched a new worker. + if mainWorker && len(h.logCh) > cap(h.logCh)/2 { + nWorkers := atomic.LoadInt64(&h.workers) + if nWorkers < h.maxWorkers { + if time.Since(h.lastStarted).Milliseconds() > 10 { + h.lastStarted = time.Now() + go h.startQueueProcessor(ctx, false) + } + } + } + + if !isDirQueue { + err = h.send(ctx, buf.Bytes(), h.payloadType, webhookCallTimeout) + } else { + err = h.store.PutMultiple(entries) + } + + if err != nil { + + h.config.LogOnceIf( + context.Background(), + fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", name, err), + name, + ) + + if errors.Is(err, context.Canceled) { + return + } + + time.Sleep(3 * time.Second) + goto retry + } + + entries = make([]interface{}, 0) + + if !isDirQueue { buf.Reset() - nevents = 0 + } + + if !mainWorker && len(h.logCh) < cap(h.logCh)/2 { + if time.Since(h.lastStarted).Seconds() > 30 { + return + } + } + + } +} + +// CreateOrAdjustGlobalBuffer will create or adjust the global log entry buffers +// which are used to migrate log entries between old and new targets. +func CreateOrAdjustGlobalBuffer(currentTgt *Target, newTgt *Target) { + logChLock.Lock() + defer logChLock.Unlock() + + requiredCap := currentTgt.config.QueueSize + (currentTgt.config.BatchSize * int(currentTgt.maxWorkers)) + currentCap := 0 + name := newTgt.Name() + + currentBuff, ok := logChBuffers[name] + if !ok { + logChBuffers[name] = make(chan interface{}, requiredCap) + currentCap = requiredCap + } else { + currentCap = cap(currentBuff) + requiredCap += len(currentBuff) + } + + if requiredCap > currentCap { + logChBuffers[name] = make(chan interface{}, requiredCap) + + if len(currentBuff) > 0 { + drain: + for { + select { + case v, ok := <-currentBuff: + if !ok { + break drain + } + logChBuffers[newTgt.Name()] <- v + default: + break drain + } + } } } } // New initializes a new logger target which // sends log over http to the specified endpoint -func New(config Config) *Target { +func New(config Config) (*Target, error) { + maxWorkers := maxWorkers + if config.BatchSize > 100 { + maxWorkers = maxWorkersWithBatchEvents + } else if config.BatchSize <= 0 { + config.BatchSize = 1 + } + h := &Target{ - logCh: make(chan interface{}, config.QueueSize), - config: config, - status: statusOffline, - batchSize: config.BatchSize, + logCh: make(chan interface{}, config.QueueSize), + config: config, + status: statusOffline, + batchSize: config.BatchSize, + maxWorkers: int64(maxWorkers), + } + + if config.BatchSize > 1 { + h.payloadType = "" + } else { + h.payloadType = "application/json" } // If proxy available, set the same @@ -369,32 +502,41 @@ func New(config Config) *Target { ctransport.Proxy = http.ProxyURL(proxyURL) h.config.Transport = ctransport } + h.client = &http.Client{Transport: h.config.Transport} - return h + if h.config.QueueDir != "" { + + queueStore := store.NewQueueStore[interface{}]( + filepath.Join(h.config.QueueDir, h.Name()), + uint64(h.config.QueueSize), + httpLoggerExtension, + ) + + if err := queueStore.Open(); err != nil { + return h, fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) + } + + h.store = queueStore + + } + + return h, nil } // SendFromStore - reads the log from store and sends it to webhook. func (h *Target) SendFromStore(key store.Key) (err error) { - var eventData interface{} - eventData, err = h.store.Get(key.Name) + var eventData []byte + eventData, err = h.store.GetRaw(key.Name) if err != nil { if os.IsNotExist(err) { return nil } return err } - atomic.AddInt64(&h.totalMessages, 1) - logJSON, err := json.Marshal(&eventData) - if err != nil { + + if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { atomic.AddInt64(&h.failedMessages, 1) - return - } - if err := h.send(context.Background(), logJSON, "application/json", webhookCallTimeout); err != nil { - atomic.AddInt64(&h.failedMessages, 1) - if xnet.IsNetworkOrHostDown(err, true) { - return store.ErrNotConnected - } return err } // Delete the event from store. @@ -406,12 +548,12 @@ func (h *Target) SendFromStore(key store.Key) (err error) { // If Cancel has been called the message is ignored. func (h *Target) Send(ctx context.Context, entry interface{}) error { if atomic.LoadInt32(&h.status) == statusClosed { + if h.migrateTarget != nil { + return h.migrateTarget.Send(ctx, entry) + } return nil } - if h.store != nil { - // save the entry to the queue store which will be replayed to the target. - return h.store.Put(entry) - } + h.logChMu.RLock() defer h.logChMu.RUnlock() if h.logCh == nil { @@ -419,11 +561,6 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { return nil } - mworkers := maxWorkers - if h.batchSize > 100 { - mworkers = maxWorkersWithBatchEvents - } - retry: select { case h.logCh <- entry: @@ -435,16 +572,7 @@ retry: } return nil default: - nWorkers := atomic.LoadInt64(&h.workers) - if nWorkers < int64(mworkers) { - // Only have one try to start at the same time. - h.workerStartMu.Lock() - if time.Since(h.lastStarted) > time.Second { - h.lastStarted = time.Now() - go h.startHTTPLogger(ctx) - } - h.workerStartMu.Unlock() - + if h.workers < h.maxWorkers { goto retry } atomic.AddInt64(&h.totalMessages, 1) @@ -460,12 +588,10 @@ retry: // All messages sent to the target after this function has been called will be dropped. func (h *Target) Cancel() { atomic.StoreInt32(&h.status, statusClosed) + h.storeCtxCancel() - // If queuestore is configured, cancel it's context to - // stop the replay go-routine. - if h.store != nil { - h.storeCtxCancel() - } + // Wait for messages to be sent... + h.wg.Wait() // Set logch to nil and close it. // This will block all Send operations, @@ -475,12 +601,4 @@ func (h *Target) Cancel() { xioutil.SafeClose(h.logCh) h.logCh = nil h.logChMu.Unlock() - - // Wait for messages to be sent... - h.wg.Wait() -} - -// Type - returns type of the target -func (h *Target) Type() types.TargetType { - return types.TargetHTTP } diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 893e06664..be66479a6 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -44,13 +44,18 @@ type Target interface { } var ( - swapAuditMuRW sync.RWMutex - swapSystemMuRW sync.RWMutex // systemTargets is the set of enabled loggers. // Must be immutable at all times. // Can be swapped to another while holding swapMu - systemTargets = []Target{} + systemTargets = []Target{} + swapSystemMuRW sync.RWMutex + + // auditTargets is the list of enabled audit loggers + // Must be immutable at all times. + // Can be swapped to another while holding swapMu + auditTargets = []Target{} + swapAuditMuRW sync.RWMutex // This is always set represent /dev/console target consoleTgt Target @@ -103,13 +108,6 @@ func CurrentStats() map[string]types.TargetStats { return res } -// auditTargets is the list of enabled audit loggers -// Must be immutable at all times. -// Can be swapped to another while holding swapMu -var ( - auditTargets = []Target{} -) - // AddSystemTarget adds a new logger target to the // list of enabled loggers func AddSystemTarget(ctx context.Context, t Target) error { @@ -132,23 +130,6 @@ func AddSystemTarget(ctx context.Context, t Target) error { return nil } -func initSystemTargets(ctx context.Context, cfgMap map[string]http.Config) ([]Target, []error) { - tgts := []Target{} - errs := []error{} - for _, l := range cfgMap { - if l.Enabled { - t := http.New(l) - tgts = append(tgts, t) - - e := t.Init(ctx) - if e != nil { - errs = append(errs, e) - } - } - } - return tgts, errs -} - func initKafkaTargets(ctx context.Context, cfgMap map[string]kafka.Config) ([]Target, []error) { tgts := []Target{} errs := []error{} @@ -183,36 +164,67 @@ func splitTargets(targets []Target, t types.TargetType) (group1 []Target, group2 func cancelTargets(targets []Target) { for _, target := range targets { - target.Cancel() + go target.Cancel() } } -// UpdateSystemTargets swaps targets with newly loaded ones from the cfg -func UpdateSystemTargets(ctx context.Context, cfg Config) []error { - newTgts, errs := initSystemTargets(ctx, cfg.HTTP) - - swapSystemMuRW.Lock() - consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole) - newTgts = append(newTgts, consoleTargets...) - systemTargets = newTgts - swapSystemMuRW.Unlock() - - cancelTargets(otherTargets) // cancel running targets - return errs +// UpdateHTTPWebhooks swaps system webhook targets with newly loaded ones from the cfg +func UpdateHTTPWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) { + return updateHTTPTargets(ctx, cfgs, &systemTargets) } -// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg -func UpdateAuditWebhookTargets(ctx context.Context, cfg Config) []error { - newWebhookTgts, errs := initSystemTargets(ctx, cfg.AuditWebhook) +// UpdateAuditWebhooks swaps audit webhook targets with newly loaded ones from the cfg +func UpdateAuditWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) { + return updateHTTPTargets(ctx, cfgs, &auditTargets) +} + +func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetList *[]Target) (errs []error) { + tgts := make([]*http.Target, 0) + newWebhooks := make([]Target, 0) + for _, cfg := range cfgs { + if cfg.Enabled { + t, err := http.New(cfg) + if err != nil { + errs = append(errs, err) + } + tgts = append(tgts, t) + newWebhooks = append(newWebhooks, t) + } + } + + oldTargets := make([]Target, len(*targetList)) + copy(oldTargets, *targetList) + + for i := range oldTargets { + currentTgt, ok := oldTargets[i].(*http.Target) + if !ok { + continue + } + var newTgt *http.Target + + for ii := range tgts { + if currentTgt.Name() == tgts[ii].Name() { + newTgt = tgts[ii] + currentTgt.AssignMigrateTarget(newTgt) + http.CreateOrAdjustGlobalBuffer(currentTgt, newTgt) + break + } + } + } + + for _, t := range tgts { + err := t.Init(ctx) + if err != nil { + errs = append(errs, err) + } + } swapAuditMuRW.Lock() - // Retain kafka targets - oldWebhookTgts, otherTgts := splitTargets(auditTargets, types.TargetHTTP) - newWebhookTgts = append(newWebhookTgts, otherTgts...) - auditTargets = newWebhookTgts + *targetList = newWebhooks swapAuditMuRW.Unlock() - cancelTargets(oldWebhookTgts) // cancel running targets + cancelTargets(oldTargets) + return errs } diff --git a/internal/store/queuestore.go b/internal/store/queuestore.go index a9aa79bfc..5f5e3f129 100644 --- a/internal/store/queuestore.go +++ b/internal/store/queuestore.go @@ -28,6 +28,8 @@ import ( "time" "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + "github.com/valyala/bytebufferpool" ) const ( @@ -84,6 +86,7 @@ func (store *QueueStore[_]) Open() error { if uint64(len(files)) > store.entryLimit { files = files[:store.entryLimit] } + for _, file := range files { if file.IsDir() { continue @@ -97,6 +100,54 @@ func (store *QueueStore[_]) Open() error { return nil } +// Delete - Remove the store directory from disk +func (store *QueueStore[_]) Delete() error { + return os.Remove(store.directory) +} + +// PutMultiple - puts an item to the store. +func (store *QueueStore[I]) PutMultiple(item []I) error { + store.Lock() + defer store.Unlock() + if uint64(len(store.entries)) >= store.entryLimit { + return errLimitExceeded + } + // Generate a new UUID for the key. + key, err := uuid.NewRandom() + if err != nil { + return err + } + return store.multiWrite(key.String(), item) +} + +// multiWrite - writes an item to the directory. +func (store *QueueStore[I]) multiWrite(key string, item []I) error { + buf := bytebufferpool.Get() + defer bytebufferpool.Put(buf) + + enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) + + for i := range item { + err := enc.Encode(item[i]) + if err != nil { + return err + } + } + b := buf.Bytes() + + path := filepath.Join(store.directory, key+store.fileExt) + err := os.WriteFile(path, b, os.FileMode(0o770)) + buf.Reset() + if err != nil { + return err + } + + // Increment the item count. + store.entries[key] = time.Now().UnixNano() + + return nil +} + // write - writes an item to the directory. func (store *QueueStore[I]) write(key string, item I) error { // Marshalls the item. @@ -131,6 +182,30 @@ func (store *QueueStore[I]) Put(item I) error { return store.write(key.String(), item) } +// GetRaw - gets an item from the store. +func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) { + store.RLock() + + defer func(store *QueueStore[I]) { + store.RUnlock() + if err != nil { + // Upon error we remove the entry. + store.Del(key) + } + }(store) + + raw, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt)) + if err != nil { + return + } + + if len(raw) == 0 { + return raw, os.ErrNotExist + } + + return +} + // Get - gets an item from the store. func (store *QueueStore[I]) Get(key string) (item I, err error) { store.RLock() diff --git a/internal/store/store.go b/internal/store/store.go index 4f5b6a60e..a72721856 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -25,7 +25,6 @@ import ( "time" xioutil "github.com/minio/minio/internal/ioutil" - xnet "github.com/minio/pkg/v2/net" ) const ( @@ -46,12 +45,15 @@ type Target interface { // Store - Used to persist items. type Store[I any] interface { Put(item I) error + PutMultiple(item []I) error Get(key string) (I, error) + GetRaw(key string) ([]byte, error) Len() int List() ([]string, error) Del(key string) error DelList(key []string) error Open() error + Delete() error Extension() string } @@ -110,15 +112,14 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l break } - if err != ErrNotConnected && !xnet.IsConnResetErr(err) { - logger(context.Background(), - fmt.Errorf("target.SendFromStore() failed with '%w'", err), - target.Name()) - } - - // Retrying after 3secs back-off + logger( + context.Background(), + fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", target.Name(), err), + target.Name(), + ) select { + // Retrying after 3secs back-off case <-retryTicker.C: case <-doneCh: return false @@ -131,7 +132,6 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l select { case key, ok := <-keyCh: if !ok { - // closed channel. return } @@ -147,9 +147,7 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l // StreamItems reads the keys from the store and replays the corresponding item to the target. func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger) { go func() { - // Replays the items from the store. keyCh := replayItems(store, doneCh, logger, target.Name()) - // Send items from the store. sendItems(target, keyCh, doneCh, logger) }() }