diff --git a/internal/logger/config.go b/internal/logger/config.go index ec058cd05..7eb5265f6 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -253,6 +253,13 @@ func NewConfig() Config { return cfg } +func getCfgVal(envName, key, defaultValue string) string { + if key != config.Default { + envName = envName + config.Default + key + } + return env.Get(envName, defaultValue) +} + func lookupLegacyConfigForSubSys(subSys string) Config { cfg := NewConfig() switch subSys { @@ -269,11 +276,7 @@ func lookupLegacyConfigForSubSys(subSys string) Config { // Load HTTP logger from the environment if found for _, target := range loggerTargets { - endpointEnv := legacyEnvLoggerHTTPEndpoint - if target != config.Default { - endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target - } - endpoint := env.Get(endpointEnv, "") + endpoint := getCfgVal(legacyEnvLoggerHTTPEndpoint, target, "") if endpoint == "" { continue } @@ -296,11 +299,7 @@ func lookupLegacyConfigForSubSys(subSys string) Config { } for _, target := range loggerAuditTargets { - endpointEnv := legacyEnvAuditLoggerHTTPEndpoint - if target != config.Default { - endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target - } - endpoint := env.Get(endpointEnv, "") + endpoint := getCfgVal(legacyEnvAuditLoggerHTTPEndpoint, target, "") if endpoint == "" { continue } @@ -316,11 +315,8 @@ func lookupLegacyConfigForSubSys(subSys string) Config { func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { for k, kv := range config.Merge(scfg[config.AuditKafkaSubSys], EnvKafkaEnable, DefaultAuditKafkaKVS) { - enableEnv := EnvKafkaEnable - if k != config.Default { - enableEnv = enableEnv + config.Default + k - } - enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) + enabledCfgVal := getCfgVal(EnvKafkaEnable, k, kv.Get(config.Enable)) + enabled, err := config.ParseBool(enabledCfgVal) if err != nil { return cfg, err } @@ -328,11 +324,7 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { continue } var brokers []xnet.Host - brokersEnv := EnvKafkaBrokers - if k != config.Default { - brokersEnv = brokersEnv + config.Default + k - } - kafkaBrokers := env.Get(brokersEnv, kv.Get(KafkaBrokers)) + kafkaBrokers := getCfgVal(EnvKafkaBrokers, k, kv.Get(KafkaBrokers)) if len(kafkaBrokers) == 0 { return cfg, config.Errorf("kafka 'brokers' cannot be empty") } @@ -348,90 +340,35 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { return cfg, err } - clientAuthEnv := EnvKafkaTLSClientAuth - if k != config.Default { - clientAuthEnv = clientAuthEnv + config.Default + k - } - clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(KafkaTLSClientAuth))) + clientAuthCfgVal := getCfgVal(EnvKafkaTLSClientAuth, k, kv.Get(KafkaTLSClientAuth)) + clientAuth, err := strconv.Atoi(clientAuthCfgVal) if err != nil { return cfg, err } - topicEnv := EnvKafkaTopic - if k != config.Default { - topicEnv = topicEnv + config.Default + k - } - - versionEnv := EnvKafkaVersion - if k != config.Default { - versionEnv = versionEnv + config.Default + k - } - kafkaArgs := kafka.Config{ Enabled: enabled, Brokers: brokers, - Topic: env.Get(topicEnv, kv.Get(KafkaTopic)), - Version: env.Get(versionEnv, kv.Get(KafkaVersion)), + Topic: getCfgVal(EnvKafkaTopic, k, kv.Get(KafkaTopic)), + Version: getCfgVal(EnvKafkaVersion, k, kv.Get(KafkaVersion)), } - tlsEnableEnv := EnvKafkaTLS - if k != config.Default { - tlsEnableEnv = tlsEnableEnv + config.Default + k - } - tlsSkipVerifyEnv := EnvKafkaTLSSkipVerify - if k != config.Default { - tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k - } - - tlsClientTLSCertEnv := EnvKafkaClientTLSCert - if k != config.Default { - tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k - } - - tlsClientTLSKeyEnv := EnvKafkaClientTLSKey - if k != config.Default { - tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k - } - - kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(KafkaTLS)) == config.EnableOn - kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(KafkaTLSSkipVerify)) == config.EnableOn + kafkaArgs.TLS.Enable = getCfgVal(EnvKafkaTLS, k, kv.Get(KafkaTLS)) == config.EnableOn + kafkaArgs.TLS.SkipVerify = getCfgVal(EnvKafkaTLSSkipVerify, k, kv.Get(KafkaTLSSkipVerify)) == config.EnableOn kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth) - kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(KafkaClientTLSCert)) - kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(KafkaClientTLSKey)) + kafkaArgs.TLS.ClientTLSCert = getCfgVal(EnvKafkaClientTLSCert, k, kv.Get(KafkaClientTLSCert)) + kafkaArgs.TLS.ClientTLSKey = getCfgVal(EnvKafkaClientTLSKey, k, kv.Get(KafkaClientTLSKey)) - saslEnableEnv := EnvKafkaSASLEnable - if k != config.Default { - saslEnableEnv = saslEnableEnv + config.Default + k - } - saslUsernameEnv := EnvKafkaSASLUsername - if k != config.Default { - saslUsernameEnv = saslUsernameEnv + config.Default + k - } - saslPasswordEnv := EnvKafkaSASLPassword - if k != config.Default { - saslPasswordEnv = saslPasswordEnv + config.Default + k - } - saslMechanismEnv := EnvKafkaSASLMechanism - if k != config.Default { - saslMechanismEnv = saslMechanismEnv + config.Default + k - } - kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(KafkaSASL)) == config.EnableOn - kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(KafkaSASLUsername)) - kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword)) - kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism)) + kafkaArgs.SASL.Enable = getCfgVal(EnvKafkaSASLEnable, k, kv.Get(KafkaSASL)) == config.EnableOn + kafkaArgs.SASL.User = getCfgVal(EnvKafkaSASLUsername, k, kv.Get(KafkaSASLUsername)) + kafkaArgs.SASL.Password = getCfgVal(EnvKafkaSASLPassword, k, kv.Get(KafkaSASLPassword)) + kafkaArgs.SASL.Mechanism = getCfgVal(EnvKafkaSASLMechanism, k, kv.Get(KafkaSASLMechanism)) - queueDirEnv := EnvKafkaQueueDir - if k != config.Default { - queueDirEnv = queueDirEnv + config.Default + k - } - kafkaArgs.QueueDir = env.Get(queueDirEnv, kv.Get(KafkaQueueDir)) + kafkaArgs.QueueDir = getCfgVal(EnvKafkaQueueDir, k, kv.Get(KafkaQueueDir)) - queueSizeEnv := EnvKafkaQueueSize - if k != config.Default { - queueSizeEnv = queueSizeEnv + config.Default + k - } - queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, kv.Get(KafkaQueueSize))) + queueSizeCfgVal := getCfgVal(EnvKafkaQueueSize, k, kv.Get(KafkaQueueSize)) + queueSize, err := strconv.Atoi(queueSizeCfgVal) if err != nil { return cfg, err } @@ -464,59 +401,38 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { // legacy environment variables, ignore. continue } - enableEnv := EnvLoggerWebhookEnable - if target != config.Default { - enableEnv = EnvLoggerWebhookEnable + config.Default + target - } - enable, err := config.ParseBool(env.Get(enableEnv, "")) + + enableCfgVal := getCfgVal(EnvLoggerWebhookEnable, target, "") + enable, err := config.ParseBool(enableCfgVal) if err != nil || !enable { continue } - endpointEnv := EnvLoggerWebhookEndpoint - if target != config.Default { - endpointEnv = EnvLoggerWebhookEndpoint + config.Default + target - } - authTokenEnv := EnvLoggerWebhookAuthToken - if target != config.Default { - authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target - } - clientCertEnv := EnvLoggerWebhookClientCert - if target != config.Default { - clientCertEnv = EnvLoggerWebhookClientCert + config.Default + target - } - clientKeyEnv := EnvLoggerWebhookClientKey - if target != config.Default { - clientKeyEnv = EnvLoggerWebhookClientKey + config.Default + target - } - err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, "")) + + clientCert := getCfgVal(EnvLoggerWebhookClientCert, target, "") + clientKey := getCfgVal(EnvLoggerWebhookClientKey, target, "") + err = config.EnsureCertAndKey(clientCert, clientKey) if err != nil { return cfg, err } - proxyEnv := EnvLoggerWebhookProxy - queueSizeEnv := EnvLoggerWebhookQueueSize - if target != config.Default { - queueSizeEnv = EnvLoggerWebhookQueueSize + config.Default + target - } - queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) + + queueSizeCfgVal := getCfgVal(EnvLoggerWebhookQueueSize, target, "100000") + queueSize, err := strconv.Atoi(queueSizeCfgVal) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } - queueDirEnv := EnvLoggerWebhookQueueDir - if target != config.Default { - queueDirEnv = EnvLoggerWebhookQueueDir + config.Default + target - } + cfg.HTTP[target] = http.Config{ Enabled: true, - Endpoint: env.Get(endpointEnv, ""), - AuthToken: env.Get(authTokenEnv, ""), - ClientCert: env.Get(clientCertEnv, ""), - ClientKey: env.Get(clientKeyEnv, ""), - Proxy: env.Get(proxyEnv, ""), + Endpoint: getCfgVal(EnvLoggerWebhookEndpoint, target, ""), + AuthToken: getCfgVal(EnvLoggerWebhookAuthToken, target, ""), + ClientCert: clientCert, + ClientKey: clientKey, + Proxy: getCfgVal(EnvLoggerWebhookProxy, target, ""), QueueSize: queueSize, - QueueDir: env.Get(queueDirEnv, ""), + QueueDir: getCfgVal(EnvLoggerWebhookQueueDir, target, ""), Name: loggerTargetNamePrefix + target, } } @@ -586,57 +502,35 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { // legacy environment variables, ignore. continue } - enableEnv := EnvAuditWebhookEnable - if target != config.Default { - enableEnv = EnvAuditWebhookEnable + config.Default + target - } - enable, err := config.ParseBool(env.Get(enableEnv, "")) + enable, err := config.ParseBool(getCfgVal(EnvAuditWebhookEnable, target, "")) if err != nil || !enable { continue } - endpointEnv := EnvAuditWebhookEndpoint - if target != config.Default { - endpointEnv = EnvAuditWebhookEndpoint + config.Default + target - } - authTokenEnv := EnvAuditWebhookAuthToken - if target != config.Default { - authTokenEnv = EnvAuditWebhookAuthToken + config.Default + target - } - clientCertEnv := EnvAuditWebhookClientCert - if target != config.Default { - clientCertEnv = EnvAuditWebhookClientCert + config.Default + target - } - clientKeyEnv := EnvAuditWebhookClientKey - if target != config.Default { - clientKeyEnv = EnvAuditWebhookClientKey + config.Default + target - } - err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, "")) + + clientCert := getCfgVal(EnvAuditWebhookClientCert, target, "") + clientKey := getCfgVal(EnvAuditWebhookClientKey, target, "") + err = config.EnsureCertAndKey(clientCert, clientKey) if err != nil { return cfg, err } - queueSizeEnv := EnvAuditWebhookQueueSize - if target != config.Default { - queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target - } - queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) + + queueSizeCfgVal := getCfgVal(EnvAuditWebhookQueueSize, target, "100000") + queueSize, err := strconv.Atoi(queueSizeCfgVal) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } - queueDirEnv := EnvAuditWebhookQueueDir - if target != config.Default { - queueDirEnv = EnvAuditWebhookQueueDir + config.Default + target - } + cfg.AuditWebhook[target] = http.Config{ Enabled: true, - Endpoint: env.Get(endpointEnv, ""), - AuthToken: env.Get(authTokenEnv, ""), - ClientCert: env.Get(clientCertEnv, ""), - ClientKey: env.Get(clientKeyEnv, ""), + Endpoint: getCfgVal(EnvAuditWebhookEndpoint, target, ""), + AuthToken: getCfgVal(EnvAuditWebhookAuthToken, target, ""), + ClientCert: clientCert, + ClientKey: clientKey, QueueSize: queueSize, - QueueDir: env.Get(queueDirEnv, ""), + QueueDir: getCfgVal(EnvAuditWebhookQueueDir, target, ""), Name: auditTargetNamePrefix + target, } } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index ebc195e70..4b70d8988 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -127,7 +127,10 @@ func (h *Target) String() string { // IsOnline returns true if the target is reachable. func (h *Target) IsOnline(ctx context.Context) bool { - return h.isAlive(ctx) == nil + if err := h.checkAlive(ctx); err != nil { + return !xnet.IsNetworkOrHostDown(err, false) + } + return true } // Stats returns the target statistics. @@ -145,7 +148,7 @@ func (h *Target) Stats() types.TargetStats { } // This will check if we can reach the remote. -func (h *Target) isAlive(ctx context.Context) (err error) { +func (h *Target) checkAlive(ctx context.Context) (err error) { return h.send(ctx, []byte(`{}`), 2*webhookCallTimeout) } @@ -179,8 +182,7 @@ func (h *Target) initLogChannel(ctx context.Context) (err error) { return errors.New("target is closed") } - err = h.isAlive(ctx) - if err != nil { + if !h.IsOnline(ctx) { // Start a goroutine that will continue to check if we can reach h.revive.Do(func() { go func() { @@ -190,7 +192,7 @@ func (h *Target) initLogChannel(ctx context.Context) (err error) { if atomic.LoadInt32(&h.status) != statusOffline { return } - if err := h.isAlive(ctx); err == nil { + if h.IsOnline(ctx) { // We are online. if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { h.workerStartMu.Lock() @@ -223,7 +225,7 @@ func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, bytes.NewReader(payload)) if err != nil { - return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err) + return fmt.Errorf("invalid configuration for '%s'; %v", h.config.Endpoint, err) } req.Header.Set(xhttp.ContentType, "application/json") req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) diff --git a/internal/store/store.go b/internal/store/store.go index c90c609bf..14b5889d8 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -54,7 +54,7 @@ type Store[I any] interface { } // replayItems - Reads the items from the store and replays. -func replayItems[I any](store Store[I], doneCh <-chan struct{}, logger logger, id string) <-chan string { +func replayItems[I any](store Store[I], doneCh <-chan struct{}, log logger, id string) <-chan string { itemKeyCh := make(chan string) go func() { @@ -66,7 +66,7 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, logger logger, i for { names, err := store.List() if err != nil { - logger(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) + log(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) } else { for _, name := range names { select {