add tcp healthcheck

This commit is contained in:
Douglas De Toni Machado 2024-08-30 22:49:33 -03:00
parent 24cede62ee
commit 09e9959478
5 changed files with 213 additions and 4 deletions

View File

@ -94,7 +94,8 @@ type TCPServersLoadBalancer struct {
// connection. It is a duration in milliseconds, defaulting to 100. A negative value
// means an infinite deadline (i.e. the reading capability is never closed).
// Deprecated: use ServersTransport to configure the TerminationDelay instead.
TerminationDelay *int `json:"terminationDelay,omitempty" toml:"terminationDelay,omitempty" yaml:"terminationDelay,omitempty" export:"true"`
TerminationDelay *int `json:"terminationDelay,omitempty" toml:"terminationDelay,omitempty" yaml:"terminationDelay,omitempty" export:"true"`
HealthCheck *TCPServerHealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"`
}
// Mergeable tells if the given service is mergeable.
@ -172,3 +173,16 @@ func (t *TCPServersTransport) SetDefaults() {
t.DialKeepAlive = ptypes.Duration(15 * time.Second)
t.TerminationDelay = ptypes.Duration(100 * time.Millisecond)
}
// +k8s:deepcopy-gen=true
// TCPServer holds a TCP Server configuration.
type TCPServerHealthCheck struct {
Address string `json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty" label:"-"`
Port string `json:"-" toml:"-" yaml:"-"`
TLS bool `json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty"`
Interval ptypes.Duration `json:"interval,omitempty" toml:"interval,omitempty" yaml:"interval,omitempty" export:"true"`
Timeout ptypes.Duration `json:"timeout,omitempty" toml:"timeout,omitempty" yaml:"timeout,omitempty" export:"true"`
Payload string `json:"payload,omitempty" toml:"payload,omitempty" yaml:"payload,omitempty" export:"true"`
Expected string `json:"expected,omitempty" toml:"expected,omitempty" yaml:"expected,omitempty" export:"true"`
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"sync/atomic"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
@ -87,6 +88,8 @@ type TCPServiceInfo struct {
// It is the caller's responsibility to set the initial status.
Status string `json:"status,omitempty"`
UsedBy []string `json:"usedBy,omitempty"` // list of routers using that service
serverStatus map[string]*atomic.Bool // keyed by server URL, UP or DOWN
}
// AddError adds err to s.Err, if it does not already exist.
@ -110,6 +113,22 @@ func (s *TCPServiceInfo) AddError(err error, critical bool) {
}
}
// UpdateServerStatus sets the status of the server in the TCPServiceInfo.
func (s *TCPServiceInfo) UpdateServerStatus(server string, status bool) {
if s.serverStatus == nil {
s.serverStatus = make(map[string]*atomic.Bool)
}
if s, exists := s.serverStatus[server]; exists {
s.Store(status)
return
}
v := &atomic.Bool{}
v.Store(status)
s.serverStatus[server] = v
}
// TCPMiddlewareInfo holds information about a currently running middleware.
type TCPMiddlewareInfo struct {
*dynamic.TCPMiddleware // dynamic configuration

120
pkg/healthcheck/tcp.go Normal file
View File

@ -0,0 +1,120 @@
package healthcheck
import (
"context"
"errors"
"net"
"time"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/config/runtime"
)
type ServiceTCPHealthChecker struct {
balancer StatusSetter
info *runtime.TCPServiceInfo
config *dynamic.TCPServerHealthCheck
interval time.Duration
timeout time.Duration
metrics metricsHealthCheck
targets map[string]*net.TCPAddr
serviceName string
}
func NewServiceTCPHealthChecker(metrics metricsHealthCheck, config *dynamic.TCPServerHealthCheck, service StatusSetter, info *runtime.TCPServiceInfo, targets map[string]*net.TCPAddr, serviceName string) *ServiceTCPHealthChecker {
return &ServiceTCPHealthChecker{
balancer: service,
info: info,
config: config,
interval: time.Duration(config.Interval),
timeout: time.Duration(config.Timeout),
metrics: metrics,
targets: targets,
serviceName: serviceName,
}
}
func (thc *ServiceTCPHealthChecker) Launch(ctx context.Context) {
ticker := time.NewTicker(thc.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for proxyName, target := range thc.targets {
select {
case <-ctx.Done():
return
default:
}
isUp := true
serverUpMetricValue := float64(1)
if err := thc.executeHealthCheck(ctx, thc.config, target); err != nil {
// The context is canceled when the dynamic configuration is refreshed.
if errors.Is(err, context.Canceled) {
return
}
log.Ctx(ctx).Warn().
Str("targetURL", target.String()).
Err(err).
Msg("Health check failed.")
isUp = false
serverUpMetricValue = float64(0)
}
thc.balancer.SetStatus(ctx, proxyName, isUp)
thc.info.UpdateServerStatus(target.String(), isUp)
thc.metrics.ServiceServerUpGauge().
With("service", thc.serviceName, "url", target.String()).
Set(serverUpMetricValue)
}
}
}
}
func (thc *ServiceTCPHealthChecker) executeHealthCheck(ctx context.Context, config *dynamic.TCPServerHealthCheck, target *net.TCPAddr) error {
ctx, cancel := context.WithTimeout(ctx, thc.timeout)
defer cancel()
dialer := net.Dialer{}
conn, err := dialer.DialContext(ctx, "tcp", target.String())
if err != nil {
return err
}
defer conn.Close()
if config.Payload != "" {
_, err = conn.Write([]byte(config.Payload))
if err != nil {
return err
}
}
if config.Expected != "" {
buf := make([]byte, len(config.Expected))
_, err = conn.Read(buf)
if err != nil {
return err
}
if string(buf) != config.Expected {
return errors.New("unexpected response")
}
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/healthcheck"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/server/provider"
"github.com/traefik/traefik/v3/pkg/tcp"
@ -18,9 +19,10 @@ import (
// Manager is the TCPHandlers factory.
type Manager struct {
dialerManager *tcp.DialerManager
configs map[string]*runtime.TCPServiceInfo
rand *rand.Rand // For the initial shuffling of load-balancers.
dialerManager *tcp.DialerManager
configs map[string]*runtime.TCPServiceInfo
rand *rand.Rand // For the initial shuffling of load-balancers.
healthCheckers map[string]*healthcheck.ServiceTCPHealthChecker
}
// NewManager creates a new manager.
@ -62,6 +64,8 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
conf.LoadBalancer.ServersTransport = provider.GetQualifiedName(ctx, conf.LoadBalancer.ServersTransport)
}
healthCheckTargets := make(map[string]*net.TCPAddr, len(conf.LoadBalancer.Servers))
for index, server := range shuffle(conf.LoadBalancer.Servers, m.rand) {
srvLogger := logger.With().
Int(logs.ServerIndex, index).
@ -91,10 +95,27 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
continue
}
tcpAddr, err := net.ResolveTCPAddr("tcp", server.Address)
if err != nil {
srvLogger.Error().Err(err).Msg("Failed to resolve TCP address")
continue
}
healthCheckTargets[server.Address] = tcpAddr
loadBalancer.AddServer(handler)
logger.Debug().Msg("Creating TCP server")
}
if conf.LoadBalancer.HealthCheck != nil {
m.healthCheckers[serviceName] = healthcheck.NewServiceTCPHealthChecker(
nil,
conf.LoadBalancer.HealthCheck,
loadBalancer,
conf,
healthCheckTargets,
serviceQualifiedName)
}
return loadBalancer, nil
case conf.Weighted != nil:

View File

@ -1,8 +1,10 @@
package tcp
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/rs/zerolog/log"
)
@ -20,6 +22,11 @@ type WRRLoadBalancer struct {
lock sync.Mutex
currentWeight int
index int
// status is a record of which child services of the Balancer are healthy.
status map[string]*atomic.Bool
// updaters is the list of hooks that are run (to update the Balancer
// parent(s)), whenever the Balancer status changes.
updaters []func(bool)
}
// NewWRRLoadBalancer creates a new WRRLoadBalancer.
@ -125,3 +132,31 @@ func (b *WRRLoadBalancer) next() (Handler, error) {
}
}
}
// SetStatus sets status (UP or DOWN) of a target server.
func (b *WRRLoadBalancer) SetStatus(ctx context.Context, childName string, up bool) {
statusString := "DOWN"
if up {
statusString = "UP"
}
log.Ctx(ctx).Debug().Msgf("Setting status of %s to %s", childName, statusString)
currentStatus, exists := b.status[childName]
if !exists {
s := &atomic.Bool{}
s.Store(up)
b.status[childName] = s
return
}
if !currentStatus.CompareAndSwap(!up, up) {
log.Ctx(ctx).Debug().Msgf("Still %s, no need to propagate", statusString)
return
}
log.Ctx(ctx).Debug().Msgf("Propagating new %s status", statusString)
for _, fn := range b.updaters {
fn(up)
}
}