From ba54b39c021a4739e3713d8e296bf7c800e2fc19 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 1 Jun 2024 20:03:39 -0700 Subject: [PATCH] fix: crash when audit webhook queue_dir is not writable (#19854) This is regression introduced in #19275 refactor --- internal/logger/target/http/http.go | 35 +++++++++++---------------- internal/logger/target/kafka/kafka.go | 3 +-- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index 894355360..c29a97786 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -193,7 +193,20 @@ func (h *Target) initDiskStore(ctx context.Context) (err error) { h.storeCtxCancel = cancel h.lastStarted = time.Now() go h.startQueueProcessor(ctx, true) + + queueStore := store.NewQueueStore[interface{}]( + filepath.Join(h.config.QueueDir, h.Name()), + uint64(h.config.QueueSize), + httpLoggerExtension, + ) + + if err := queueStore.Open(); err != nil { + return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) + } + + h.store = queueStore store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnceIf) + return nil } @@ -314,10 +327,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf) defer bytebufferpool.Put(buf) - isDirQueue := false - if h.config.QueueDir != "" { - isDirQueue = true - } + isDirQueue := h.config.QueueDir != "" // globalBuffer is always created or adjusted // before this method is launched. @@ -504,23 +514,6 @@ func New(config Config) (*Target, error) { } h.client = &http.Client{Transport: h.config.Transport} - - if h.config.QueueDir != "" { - - queueStore := store.NewQueueStore[interface{}]( - filepath.Join(h.config.QueueDir, h.Name()), - uint64(h.config.QueueSize), - httpLoggerExtension, - ) - - if err := queueStore.Open(); err != nil { - return h, fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) - } - - h.store = queueStore - - } - return h, nil } diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index d091ec884..29a0bf013 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -169,9 +169,8 @@ func (h *Target) Init(ctx context.Context) error { } func (h *Target) initQueueStore(ctx context.Context) (err error) { - var queueStore store.Store[interface{}] queueDir := filepath.Join(h.kconfig.QueueDir, h.Name()) - queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension) + queueStore := store.NewQueueStore[interface{}](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension) if err = queueStore.Open(); err != nil { return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) }