diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index f65ebc4be..3173e9c10 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -154,14 +154,15 @@ func (k KafkaArgs) Validate() error { type KafkaTarget struct { initOnce once.Init - id event.TargetID - args KafkaArgs - producer sarama.SyncProducer - config *sarama.Config - store store.Store[event.Event] - batch *store.Batch[string, *sarama.ProducerMessage] - loggerOnce logger.LogOnce - quitCh chan struct{} + id event.TargetID + args KafkaArgs + producer sarama.SyncProducer + config *sarama.Config + store store.Store[event.Event] + batch *store.Batch[string, *sarama.ProducerMessage] + loggerOnce logger.LogOnce + brokerConns map[string]net.Conn + quitCh chan struct{} } // ID - returns target ID. @@ -188,7 +189,7 @@ func (target *KafkaTarget) IsActive() (bool, error) { } func (target *KafkaTarget) isActive() (bool, error) { - if err := target.args.pingBrokers(); err != nil { + if err := target.pingBrokers(); err != nil { return false, store.ErrNotConnected } return true, nil @@ -202,10 +203,6 @@ func (target *KafkaTarget) Save(eventData event.Event) error { if err := target.init(); err != nil { return err } - _, err := target.isActive() - if err != nil { - return err - } return target.send(eventData) } @@ -234,12 +231,6 @@ func (target *KafkaTarget) SendFromStore(key store.Key) error { return target.addToBatch(key) } - var err error - _, err = target.isActive() - if err != nil { - return err - } - eventData, eErr := target.store.Get(key.Name) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() @@ -250,8 +241,7 @@ func (target *KafkaTarget) SendFromStore(key store.Key) error { return eErr } - err = target.send(eventData) - if err != nil { + if err := target.send(eventData); err != nil { if isKafkaConnErr(err) { return store.ErrNotConnected } @@ -292,9 +282,6 @@ func (target *KafkaTarget) addToBatch(key store.Key) error { } func (target *KafkaTarget) commitBatch() error { - if _, err := target.isActive(); err != nil { - return err - } keys, msgs, err := target.batch.GetAll() if err != nil { return err @@ -333,23 +320,38 @@ func (target *KafkaTarget) Close() error { if target.producer != nil { return target.producer.Close() } + for _, conn := range target.brokerConns { + if conn != nil { + conn.Close() + } + } return nil } // Check if at least one broker in cluster is active -func (k KafkaArgs) pingBrokers() (err error) { +func (target *KafkaTarget) pingBrokers() (err error) { d := net.Dialer{Timeout: 1 * time.Second} - errs := make([]error, len(k.Brokers)) + errs := make([]error, len(target.args.Brokers)) var wg sync.WaitGroup - for idx, broker := range k.Brokers { + for idx, broker := range target.args.Brokers { broker := broker idx := idx wg.Add(1) go func(broker xnet.Host, idx int) { defer wg.Done() - - _, errs[idx] = d.Dial("tcp", broker.String()) + conn, ok := target.brokerConns[broker.String()] + if !ok || conn == nil { + conn, errs[idx] = d.Dial("tcp", broker.String()) + if errs[idx] != nil { + return + } + target.brokerConns[broker.String()] = conn + } + if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil { + conn.Close() + target.brokerConns[broker.String()] = nil + } }(broker, idx) } wg.Wait() @@ -461,11 +463,12 @@ func NewKafkaTarget(id string, args KafkaArgs, loggerOnce logger.LogOnce) (*Kafk } target := &KafkaTarget{ - id: event.TargetID{ID: id, Name: "kafka"}, - args: args, - store: queueStore, - loggerOnce: loggerOnce, - quitCh: make(chan struct{}), + id: event.TargetID{ID: id, Name: "kafka"}, + args: args, + store: queueStore, + loggerOnce: loggerOnce, + quitCh: make(chan struct{}), + brokerConns: make(map[string]net.Conn, len(args.Brokers)), } if target.store != nil { diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index 2e1f843b7..c31c333ce 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -77,20 +77,29 @@ type Config struct { LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` } -// Check if at least one broker in cluster is active -func (k Config) pingBrokers() (err error) { +func (h *Target) pingBrokers() (err error) { d := net.Dialer{Timeout: 1 * time.Second} - errs := make([]error, len(k.Brokers)) + errs := make([]error, len(h.kconfig.Brokers)) var wg sync.WaitGroup - for idx, broker := range k.Brokers { + for idx, broker := range h.kconfig.Brokers { broker := broker idx := idx wg.Add(1) go func(broker xnet.Host, idx int) { defer wg.Done() - - _, errs[idx] = d.Dial("tcp", broker.String()) + conn, ok := h.brokerConns[broker.String()] + if !ok || conn == nil { + conn, errs[idx] = d.Dial("tcp", broker.String()) + if errs[idx] != nil { + return + } + h.brokerConns[broker.String()] = conn + } + if _, errs[idx] = conn.Write([]byte("")); errs[idx] != nil { + conn.Close() + h.brokerConns[broker.String()] = nil + } }(broker, idx) } wg.Wait() @@ -98,7 +107,7 @@ func (k Config) pingBrokers() (err error) { var retErr error for _, err := range errs { if err == nil { - // if one broker is online its enough + // if one of them is active we are good. return nil } retErr = err @@ -129,9 +138,10 @@ type Target struct { initKafkaOnce once.Init initQueueStoreOnce once.Init - producer sarama.SyncProducer - kconfig Config - config *sarama.Config + producer sarama.SyncProducer + kconfig Config + config *sarama.Config + brokerConns map[string]net.Conn } func (h *Target) validate() error { @@ -262,7 +272,7 @@ func (h *Target) send(entry interface{}) error { // Init initialize kafka target func (h *Target) init() error { - if err := h.kconfig.pingBrokers(); err != nil { + if err := h.pingBrokers(); err != nil { return err } @@ -400,6 +410,12 @@ func (h *Target) Cancel() { h.producer.Close() } + for _, conn := range h.brokerConns { + if conn != nil { + conn.Close() + } + } + // Wait for messages to be sent... h.wg.Wait() } @@ -408,9 +424,10 @@ func (h *Target) Cancel() { // sends log over http to the specified endpoint func New(config Config) *Target { target := &Target{ - logCh: make(chan interface{}, config.QueueSize), - kconfig: config, - status: statusOffline, + logCh: make(chan interface{}, config.QueueSize), + kconfig: config, + status: statusOffline, + brokerConns: make(map[string]net.Conn, len(config.Brokers)), } return target }