mirror of
https://github.com/juanfont/headscale.git
synced 2026-05-05 03:56:10 +02:00
state: add HA health prober
Ping HA subnet routers each probe cycle and mark unresponsive nodes unhealthy. Reconnecting a node clears its unhealthy state since the fresh Noise session proves basic connectivity. Updates #2129 Updates #2902
This commit is contained in:
parent
786ce2dce8
commit
90e65ccd63
139
hscontrol/state/ha_health.go
Normal file
139
hscontrol/state/ha_health.go
Normal file
@ -0,0 +1,139 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juanfont/headscale/hscontrol/types"
|
||||
"github.com/juanfont/headscale/hscontrol/types/change"
|
||||
"github.com/juanfont/headscale/hscontrol/util/zlog/zf"
|
||||
"github.com/rs/zerolog/log"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/util/set"
|
||||
)
|
||||
|
||||
// HAHealthProber periodically pings HA subnet router nodes and
|
||||
// triggers failover when a primary stops responding.
|
||||
type HAHealthProber struct {
|
||||
state *State
|
||||
cfg types.HARouteConfig
|
||||
serverURL string
|
||||
isConnected func(types.NodeID) bool
|
||||
}
|
||||
|
||||
// NewHAHealthProber creates a prober that uses the given State for
|
||||
// ping tracking and primary route management.
|
||||
// isConnected should return true if a node has an active map session.
|
||||
func NewHAHealthProber(
|
||||
s *State,
|
||||
cfg types.HARouteConfig,
|
||||
serverURL string,
|
||||
isConnected func(types.NodeID) bool,
|
||||
) *HAHealthProber {
|
||||
return &HAHealthProber{
|
||||
state: s,
|
||||
cfg: cfg,
|
||||
serverURL: serverURL,
|
||||
isConnected: isConnected,
|
||||
}
|
||||
}
|
||||
|
||||
// ProbeOnce pings all HA subnet router nodes. PingNode changes are
|
||||
// dispatched immediately via dispatch so nodes can respond before the
|
||||
// timeout. Health-related policy changes are also dispatched inline.
|
||||
func (p *HAHealthProber) ProbeOnce(
|
||||
ctx context.Context,
|
||||
dispatch func(...change.Change),
|
||||
) {
|
||||
haNodes := p.state.primaryRoutes.HANodes()
|
||||
if len(haNodes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Deduplicate node IDs across prefixes.
|
||||
seen := make(set.Set[types.NodeID])
|
||||
|
||||
var nodeIDs []types.NodeID
|
||||
|
||||
for _, nodes := range haNodes {
|
||||
for _, id := range nodes {
|
||||
if !seen.Contains(id) {
|
||||
seen.Add(id)
|
||||
nodeIDs = append(nodeIDs, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Int("haNodes", len(nodeIDs)).
|
||||
Msg("HA health prober starting probe cycle")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
deadline := time.After(p.cfg.ProbeTimeout)
|
||||
|
||||
for _, id := range nodeIDs {
|
||||
if !p.isConnected(id) {
|
||||
log.Debug().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Msg("HA probe: skipping offline node")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
pingID, responseCh := p.state.RegisterPing(id)
|
||||
callbackURL := p.serverURL + "/machine/ping-response?id=" + pingID
|
||||
|
||||
dispatch(change.PingNode(id, &tailcfg.PingRequest{
|
||||
URL: callbackURL,
|
||||
}))
|
||||
|
||||
wg.Go(func() {
|
||||
select {
|
||||
case latency := <-responseCh:
|
||||
log.Debug().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Dur("latency", latency).
|
||||
Msg("HA probe: node responded")
|
||||
|
||||
if p.state.primaryRoutes.SetNodeHealthy(id, true) {
|
||||
dispatch(change.PolicyChange())
|
||||
|
||||
log.Info().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Msg("HA probe: node recovered, recalculating primaries")
|
||||
}
|
||||
|
||||
case <-deadline:
|
||||
p.state.CancelPing(pingID)
|
||||
|
||||
if !p.isConnected(id) {
|
||||
log.Debug().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Msg("HA probe: node went offline during probe, skipping")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Warn().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Dur("timeout", p.cfg.ProbeTimeout).
|
||||
Msg("HA probe: node did not respond")
|
||||
|
||||
if p.state.primaryRoutes.SetNodeHealthy(id, false) {
|
||||
dispatch(change.PolicyChange())
|
||||
|
||||
log.Info().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Msg("HA probe: node unhealthy, triggering failover")
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
p.state.CancelPing(pingID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
@ -574,6 +574,10 @@ func (s *State) Connect(id types.NodeID) ([]change.Change, uint64) {
|
||||
|
||||
log.Info().EmbedObject(node).Msg("node connected")
|
||||
|
||||
// Reconnecting clears any prior unhealthy state — the node proved
|
||||
// basic connectivity by establishing the Noise session.
|
||||
s.primaryRoutes.ClearUnhealthy(id)
|
||||
|
||||
// Use the node's current routes for primary route update.
|
||||
// AllApprovedRoutes() returns only the intersection of announced and approved routes.
|
||||
routeChange := s.primaryRoutes.SetRoutes(id, node.AllApprovedRoutes()...)
|
||||
@ -1184,6 +1188,11 @@ func (s *State) RoutesForPeer(
|
||||
return reduced
|
||||
}
|
||||
|
||||
// PrimaryRoutes returns the primary routes tracker.
|
||||
func (s *State) PrimaryRoutes() *routes.PrimaryRoutes {
|
||||
return s.primaryRoutes
|
||||
}
|
||||
|
||||
// PrimaryRoutesString returns a string representation of all primary routes.
|
||||
func (s *State) PrimaryRoutesString() string {
|
||||
return s.primaryRoutes.String()
|
||||
|
||||
@ -60,6 +60,22 @@ type EphemeralConfig struct {
|
||||
InactivityTimeout time.Duration
|
||||
}
|
||||
|
||||
// HARouteConfig contains configuration for HA subnet router health probing.
|
||||
type HARouteConfig struct {
|
||||
// ProbeInterval is how often HA subnet routers are probed.
|
||||
// A zero or negative duration disables probing.
|
||||
ProbeInterval time.Duration
|
||||
|
||||
// ProbeTimeout is the maximum time to wait for a probe response
|
||||
// before declaring a node unhealthy. Must be less than ProbeInterval.
|
||||
ProbeTimeout time.Duration
|
||||
}
|
||||
|
||||
// RouteConfig contains configuration for route behaviour.
|
||||
type RouteConfig struct {
|
||||
HA HARouteConfig
|
||||
}
|
||||
|
||||
// NodeConfig contains configuration for node lifecycle and expiry.
|
||||
type NodeConfig struct {
|
||||
// Expiry is the default key expiry duration for non-tagged nodes.
|
||||
@ -70,6 +86,9 @@ type NodeConfig struct {
|
||||
|
||||
// Ephemeral contains configuration for ephemeral node lifecycle.
|
||||
Ephemeral EphemeralConfig
|
||||
|
||||
// Routes contains configuration for route behaviour.
|
||||
Routes RouteConfig
|
||||
}
|
||||
|
||||
// Config contains the initial Headscale configuration.
|
||||
@ -414,6 +433,8 @@ func LoadConfig(path string, isFile bool) error {
|
||||
|
||||
viper.SetDefault("node.expiry", "0")
|
||||
viper.SetDefault("node.ephemeral.inactivity_timeout", "120s")
|
||||
viper.SetDefault("node.routes.ha.probe_interval", "10s")
|
||||
viper.SetDefault("node.routes.ha.probe_timeout", "5s")
|
||||
|
||||
viper.SetDefault("tuning.notifier_send_timeout", "800ms")
|
||||
viper.SetDefault("tuning.batch_change_delay", "800ms")
|
||||
@ -576,6 +597,34 @@ func validateServerConfig() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Validate HA health probing parameters
|
||||
if haInterval := viper.GetDuration(
|
||||
"node.routes.ha.probe_interval",
|
||||
); haInterval > 0 {
|
||||
if haInterval < 2*time.Second {
|
||||
errorText += fmt.Sprintf(
|
||||
"Fatal config error: node.routes.ha.probe_interval (%s) must be >= 2s\n",
|
||||
haInterval,
|
||||
)
|
||||
}
|
||||
|
||||
haTimeout := viper.GetDuration("node.routes.ha.probe_timeout")
|
||||
if haTimeout < 1*time.Second {
|
||||
errorText += fmt.Sprintf(
|
||||
"Fatal config error: node.routes.ha.probe_timeout (%s) must be >= 1s\n",
|
||||
haTimeout,
|
||||
)
|
||||
}
|
||||
|
||||
if haTimeout >= haInterval {
|
||||
errorText += fmt.Sprintf(
|
||||
"Fatal config error: node.routes.ha.probe_timeout (%s) must be less than node.routes.ha.probe_interval (%s)\n",
|
||||
haTimeout,
|
||||
haInterval,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate tuning parameters
|
||||
if size := viper.GetInt("tuning.node_store_batch_size"); size <= 0 {
|
||||
errorText += fmt.Sprintf(
|
||||
@ -1129,6 +1178,12 @@ func LoadServerConfig() (*Config, error) {
|
||||
Ephemeral: EphemeralConfig{
|
||||
InactivityTimeout: resolveEphemeralInactivityTimeout(),
|
||||
},
|
||||
Routes: RouteConfig{
|
||||
HA: HARouteConfig{
|
||||
ProbeInterval: viper.GetDuration("node.routes.ha.probe_interval"),
|
||||
ProbeTimeout: viper.GetDuration("node.routes.ha.probe_timeout"),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
Database: databaseConfig(),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user