From 09e9959478ea5027caed2743b064097cf6d5a90a Mon Sep 17 00:00:00 2001 From: Douglas De Toni Machado Date: Fri, 30 Aug 2024 22:49:33 -0300 Subject: [PATCH] add tcp healthcheck --- pkg/config/dynamic/tcp_config.go | 16 +++- pkg/config/runtime/runtime_tcp.go | 19 +++++ pkg/healthcheck/tcp.go | 120 ++++++++++++++++++++++++++++++ pkg/server/service/tcp/service.go | 27 ++++++- pkg/tcp/wrr_load_balancer.go | 35 +++++++++ 5 files changed, 213 insertions(+), 4 deletions(-) create mode 100644 pkg/healthcheck/tcp.go diff --git a/pkg/config/dynamic/tcp_config.go b/pkg/config/dynamic/tcp_config.go index 585690c4b..093568e15 100644 --- a/pkg/config/dynamic/tcp_config.go +++ b/pkg/config/dynamic/tcp_config.go @@ -94,7 +94,8 @@ type TCPServersLoadBalancer struct { // connection. It is a duration in milliseconds, defaulting to 100. A negative value // means an infinite deadline (i.e. the reading capability is never closed). // Deprecated: use ServersTransport to configure the TerminationDelay instead. - TerminationDelay *int `json:"terminationDelay,omitempty" toml:"terminationDelay,omitempty" yaml:"terminationDelay,omitempty" export:"true"` + TerminationDelay *int `json:"terminationDelay,omitempty" toml:"terminationDelay,omitempty" yaml:"terminationDelay,omitempty" export:"true"` + HealthCheck *TCPServerHealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"` } // Mergeable tells if the given service is mergeable. @@ -172,3 +173,16 @@ func (t *TCPServersTransport) SetDefaults() { t.DialKeepAlive = ptypes.Duration(15 * time.Second) t.TerminationDelay = ptypes.Duration(100 * time.Millisecond) } + +// +k8s:deepcopy-gen=true + +// TCPServer holds a TCP Server configuration. +type TCPServerHealthCheck struct { + Address string `json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty" label:"-"` + Port string `json:"-" toml:"-" yaml:"-"` + TLS bool `json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty"` + Interval ptypes.Duration `json:"interval,omitempty" toml:"interval,omitempty" yaml:"interval,omitempty" export:"true"` + Timeout ptypes.Duration `json:"timeout,omitempty" toml:"timeout,omitempty" yaml:"timeout,omitempty" export:"true"` + Payload string `json:"payload,omitempty" toml:"payload,omitempty" yaml:"payload,omitempty" export:"true"` + Expected string `json:"expected,omitempty" toml:"expected,omitempty" yaml:"expected,omitempty" export:"true"` +} diff --git a/pkg/config/runtime/runtime_tcp.go b/pkg/config/runtime/runtime_tcp.go index 1c213f7b1..b3743794a 100644 --- a/pkg/config/runtime/runtime_tcp.go +++ b/pkg/config/runtime/runtime_tcp.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "slices" + "sync/atomic" "github.com/rs/zerolog/log" "github.com/traefik/traefik/v3/pkg/config/dynamic" @@ -87,6 +88,8 @@ type TCPServiceInfo struct { // It is the caller's responsibility to set the initial status. Status string `json:"status,omitempty"` UsedBy []string `json:"usedBy,omitempty"` // list of routers using that service + + serverStatus map[string]*atomic.Bool // keyed by server URL, UP or DOWN } // AddError adds err to s.Err, if it does not already exist. @@ -110,6 +113,22 @@ func (s *TCPServiceInfo) AddError(err error, critical bool) { } } +// UpdateServerStatus sets the status of the server in the TCPServiceInfo. +func (s *TCPServiceInfo) UpdateServerStatus(server string, status bool) { + if s.serverStatus == nil { + s.serverStatus = make(map[string]*atomic.Bool) + } + + if s, exists := s.serverStatus[server]; exists { + s.Store(status) + return + } + + v := &atomic.Bool{} + v.Store(status) + s.serverStatus[server] = v +} + // TCPMiddlewareInfo holds information about a currently running middleware. type TCPMiddlewareInfo struct { *dynamic.TCPMiddleware // dynamic configuration diff --git a/pkg/healthcheck/tcp.go b/pkg/healthcheck/tcp.go new file mode 100644 index 000000000..35973cd06 --- /dev/null +++ b/pkg/healthcheck/tcp.go @@ -0,0 +1,120 @@ +package healthcheck + +import ( + "context" + "errors" + "net" + "time" + + "github.com/rs/zerolog/log" + "github.com/traefik/traefik/v3/pkg/config/dynamic" + "github.com/traefik/traefik/v3/pkg/config/runtime" +) + +type ServiceTCPHealthChecker struct { + balancer StatusSetter + info *runtime.TCPServiceInfo + + config *dynamic.TCPServerHealthCheck + interval time.Duration + timeout time.Duration + + metrics metricsHealthCheck + + targets map[string]*net.TCPAddr + serviceName string +} + +func NewServiceTCPHealthChecker(metrics metricsHealthCheck, config *dynamic.TCPServerHealthCheck, service StatusSetter, info *runtime.TCPServiceInfo, targets map[string]*net.TCPAddr, serviceName string) *ServiceTCPHealthChecker { + return &ServiceTCPHealthChecker{ + balancer: service, + info: info, + config: config, + interval: time.Duration(config.Interval), + timeout: time.Duration(config.Timeout), + metrics: metrics, + targets: targets, + serviceName: serviceName, + } +} + +func (thc *ServiceTCPHealthChecker) Launch(ctx context.Context) { + ticker := time.NewTicker(thc.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + for proxyName, target := range thc.targets { + select { + case <-ctx.Done(): + return + default: + } + + isUp := true + serverUpMetricValue := float64(1) + + if err := thc.executeHealthCheck(ctx, thc.config, target); err != nil { + // The context is canceled when the dynamic configuration is refreshed. + if errors.Is(err, context.Canceled) { + return + } + + log.Ctx(ctx).Warn(). + Str("targetURL", target.String()). + Err(err). + Msg("Health check failed.") + + isUp = false + serverUpMetricValue = float64(0) + } + + thc.balancer.SetStatus(ctx, proxyName, isUp) + + thc.info.UpdateServerStatus(target.String(), isUp) + + thc.metrics.ServiceServerUpGauge(). + With("service", thc.serviceName, "url", target.String()). + Set(serverUpMetricValue) + } + } + } +} + +func (thc *ServiceTCPHealthChecker) executeHealthCheck(ctx context.Context, config *dynamic.TCPServerHealthCheck, target *net.TCPAddr) error { + ctx, cancel := context.WithTimeout(ctx, thc.timeout) + defer cancel() + + dialer := net.Dialer{} + conn, err := dialer.DialContext(ctx, "tcp", target.String()) + if err != nil { + return err + } + + defer conn.Close() + + if config.Payload != "" { + _, err = conn.Write([]byte(config.Payload)) + if err != nil { + return err + } + } + + if config.Expected != "" { + buf := make([]byte, len(config.Expected)) + _, err = conn.Read(buf) + if err != nil { + return err + } + + if string(buf) != config.Expected { + return errors.New("unexpected response") + } + } + + return nil +} diff --git a/pkg/server/service/tcp/service.go b/pkg/server/service/tcp/service.go index 64c173a51..c8f08f719 100644 --- a/pkg/server/service/tcp/service.go +++ b/pkg/server/service/tcp/service.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" "github.com/traefik/traefik/v3/pkg/config/runtime" + "github.com/traefik/traefik/v3/pkg/healthcheck" "github.com/traefik/traefik/v3/pkg/logs" "github.com/traefik/traefik/v3/pkg/server/provider" "github.com/traefik/traefik/v3/pkg/tcp" @@ -18,9 +19,10 @@ import ( // Manager is the TCPHandlers factory. type Manager struct { - dialerManager *tcp.DialerManager - configs map[string]*runtime.TCPServiceInfo - rand *rand.Rand // For the initial shuffling of load-balancers. + dialerManager *tcp.DialerManager + configs map[string]*runtime.TCPServiceInfo + rand *rand.Rand // For the initial shuffling of load-balancers. + healthCheckers map[string]*healthcheck.ServiceTCPHealthChecker } // NewManager creates a new manager. @@ -62,6 +64,8 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han conf.LoadBalancer.ServersTransport = provider.GetQualifiedName(ctx, conf.LoadBalancer.ServersTransport) } + healthCheckTargets := make(map[string]*net.TCPAddr, len(conf.LoadBalancer.Servers)) + for index, server := range shuffle(conf.LoadBalancer.Servers, m.rand) { srvLogger := logger.With(). Int(logs.ServerIndex, index). @@ -91,10 +95,27 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han continue } + tcpAddr, err := net.ResolveTCPAddr("tcp", server.Address) + if err != nil { + srvLogger.Error().Err(err).Msg("Failed to resolve TCP address") + continue + } + healthCheckTargets[server.Address] = tcpAddr + loadBalancer.AddServer(handler) logger.Debug().Msg("Creating TCP server") } + if conf.LoadBalancer.HealthCheck != nil { + m.healthCheckers[serviceName] = healthcheck.NewServiceTCPHealthChecker( + nil, + conf.LoadBalancer.HealthCheck, + loadBalancer, + conf, + healthCheckTargets, + serviceQualifiedName) + } + return loadBalancer, nil case conf.Weighted != nil: diff --git a/pkg/tcp/wrr_load_balancer.go b/pkg/tcp/wrr_load_balancer.go index 93d43a4fb..b4d1e26cc 100644 --- a/pkg/tcp/wrr_load_balancer.go +++ b/pkg/tcp/wrr_load_balancer.go @@ -1,8 +1,10 @@ package tcp import ( + "context" "errors" "sync" + "sync/atomic" "github.com/rs/zerolog/log" ) @@ -20,6 +22,11 @@ type WRRLoadBalancer struct { lock sync.Mutex currentWeight int index int + // status is a record of which child services of the Balancer are healthy. + status map[string]*atomic.Bool + // updaters is the list of hooks that are run (to update the Balancer + // parent(s)), whenever the Balancer status changes. + updaters []func(bool) } // NewWRRLoadBalancer creates a new WRRLoadBalancer. @@ -125,3 +132,31 @@ func (b *WRRLoadBalancer) next() (Handler, error) { } } } + +// SetStatus sets status (UP or DOWN) of a target server. +func (b *WRRLoadBalancer) SetStatus(ctx context.Context, childName string, up bool) { + statusString := "DOWN" + if up { + statusString = "UP" + } + + log.Ctx(ctx).Debug().Msgf("Setting status of %s to %s", childName, statusString) + + currentStatus, exists := b.status[childName] + if !exists { + s := &atomic.Bool{} + s.Store(up) + b.status[childName] = s + return + } + + if !currentStatus.CompareAndSwap(!up, up) { + log.Ctx(ctx).Debug().Msgf("Still %s, no need to propagate", statusString) + return + } + + log.Ctx(ctx).Debug().Msgf("Propagating new %s status", statusString) + for _, fn := range b.updaters { + fn(up) + } +}