diff --git a/hscontrol/state/ha_health.go b/hscontrol/state/ha_health.go new file mode 100644 index 00000000..07addee0 --- /dev/null +++ b/hscontrol/state/ha_health.go @@ -0,0 +1,139 @@ +package state + +import ( + "context" + "sync" + "time" + + "github.com/juanfont/headscale/hscontrol/types" + "github.com/juanfont/headscale/hscontrol/types/change" + "github.com/juanfont/headscale/hscontrol/util/zlog/zf" + "github.com/rs/zerolog/log" + "tailscale.com/tailcfg" + "tailscale.com/util/set" +) + +// HAHealthProber periodically pings HA subnet router nodes and +// triggers failover when a primary stops responding. +type HAHealthProber struct { + state *State + cfg types.HARouteConfig + serverURL string + isConnected func(types.NodeID) bool +} + +// NewHAHealthProber creates a prober that uses the given State for +// ping tracking and primary route management. +// isConnected should return true if a node has an active map session. +func NewHAHealthProber( + s *State, + cfg types.HARouteConfig, + serverURL string, + isConnected func(types.NodeID) bool, +) *HAHealthProber { + return &HAHealthProber{ + state: s, + cfg: cfg, + serverURL: serverURL, + isConnected: isConnected, + } +} + +// ProbeOnce pings all HA subnet router nodes. PingNode changes are +// dispatched immediately via dispatch so nodes can respond before the +// timeout. Health-related policy changes are also dispatched inline. +func (p *HAHealthProber) ProbeOnce( + ctx context.Context, + dispatch func(...change.Change), +) { + haNodes := p.state.primaryRoutes.HANodes() + if len(haNodes) == 0 { + return + } + + // Deduplicate node IDs across prefixes. + seen := make(set.Set[types.NodeID]) + + var nodeIDs []types.NodeID + + for _, nodes := range haNodes { + for _, id := range nodes { + if !seen.Contains(id) { + seen.Add(id) + nodeIDs = append(nodeIDs, id) + } + } + } + + log.Debug(). + Int("haNodes", len(nodeIDs)). + Msg("HA health prober starting probe cycle") + + var wg sync.WaitGroup + + deadline := time.After(p.cfg.ProbeTimeout) + + for _, id := range nodeIDs { + if !p.isConnected(id) { + log.Debug(). + Uint64(zf.NodeID, id.Uint64()). + Msg("HA probe: skipping offline node") + + continue + } + + pingID, responseCh := p.state.RegisterPing(id) + callbackURL := p.serverURL + "/machine/ping-response?id=" + pingID + + dispatch(change.PingNode(id, &tailcfg.PingRequest{ + URL: callbackURL, + })) + + wg.Go(func() { + select { + case latency := <-responseCh: + log.Debug(). + Uint64(zf.NodeID, id.Uint64()). + Dur("latency", latency). + Msg("HA probe: node responded") + + if p.state.primaryRoutes.SetNodeHealthy(id, true) { + dispatch(change.PolicyChange()) + + log.Info(). + Uint64(zf.NodeID, id.Uint64()). + Msg("HA probe: node recovered, recalculating primaries") + } + + case <-deadline: + p.state.CancelPing(pingID) + + if !p.isConnected(id) { + log.Debug(). + Uint64(zf.NodeID, id.Uint64()). + Msg("HA probe: node went offline during probe, skipping") + + return + } + + log.Warn(). + Uint64(zf.NodeID, id.Uint64()). + Dur("timeout", p.cfg.ProbeTimeout). + Msg("HA probe: node did not respond") + + if p.state.primaryRoutes.SetNodeHealthy(id, false) { + dispatch(change.PolicyChange()) + + log.Info(). + Uint64(zf.NodeID, id.Uint64()). + Msg("HA probe: node unhealthy, triggering failover") + } + + case <-ctx.Done(): + p.state.CancelPing(pingID) + } + }) + } + + wg.Wait() +} diff --git a/hscontrol/state/state.go b/hscontrol/state/state.go index af9e984c..abd615e9 100644 --- a/hscontrol/state/state.go +++ b/hscontrol/state/state.go @@ -574,6 +574,10 @@ func (s *State) Connect(id types.NodeID) ([]change.Change, uint64) { log.Info().EmbedObject(node).Msg("node connected") + // Reconnecting clears any prior unhealthy state — the node proved + // basic connectivity by establishing the Noise session. + s.primaryRoutes.ClearUnhealthy(id) + // Use the node's current routes for primary route update. // AllApprovedRoutes() returns only the intersection of announced and approved routes. routeChange := s.primaryRoutes.SetRoutes(id, node.AllApprovedRoutes()...) @@ -1184,6 +1188,11 @@ func (s *State) RoutesForPeer( return reduced } +// PrimaryRoutes returns the primary routes tracker. +func (s *State) PrimaryRoutes() *routes.PrimaryRoutes { + return s.primaryRoutes +} + // PrimaryRoutesString returns a string representation of all primary routes. func (s *State) PrimaryRoutesString() string { return s.primaryRoutes.String() diff --git a/hscontrol/types/config.go b/hscontrol/types/config.go index 849aae85..779380fc 100644 --- a/hscontrol/types/config.go +++ b/hscontrol/types/config.go @@ -60,6 +60,22 @@ type EphemeralConfig struct { InactivityTimeout time.Duration } +// HARouteConfig contains configuration for HA subnet router health probing. +type HARouteConfig struct { + // ProbeInterval is how often HA subnet routers are probed. + // A zero or negative duration disables probing. + ProbeInterval time.Duration + + // ProbeTimeout is the maximum time to wait for a probe response + // before declaring a node unhealthy. Must be less than ProbeInterval. + ProbeTimeout time.Duration +} + +// RouteConfig contains configuration for route behaviour. +type RouteConfig struct { + HA HARouteConfig +} + // NodeConfig contains configuration for node lifecycle and expiry. type NodeConfig struct { // Expiry is the default key expiry duration for non-tagged nodes. @@ -70,6 +86,9 @@ type NodeConfig struct { // Ephemeral contains configuration for ephemeral node lifecycle. Ephemeral EphemeralConfig + + // Routes contains configuration for route behaviour. + Routes RouteConfig } // Config contains the initial Headscale configuration. @@ -414,6 +433,8 @@ func LoadConfig(path string, isFile bool) error { viper.SetDefault("node.expiry", "0") viper.SetDefault("node.ephemeral.inactivity_timeout", "120s") + viper.SetDefault("node.routes.ha.probe_interval", "10s") + viper.SetDefault("node.routes.ha.probe_timeout", "5s") viper.SetDefault("tuning.notifier_send_timeout", "800ms") viper.SetDefault("tuning.batch_change_delay", "800ms") @@ -576,6 +597,34 @@ func validateServerConfig() error { } } + // Validate HA health probing parameters + if haInterval := viper.GetDuration( + "node.routes.ha.probe_interval", + ); haInterval > 0 { + if haInterval < 2*time.Second { + errorText += fmt.Sprintf( + "Fatal config error: node.routes.ha.probe_interval (%s) must be >= 2s\n", + haInterval, + ) + } + + haTimeout := viper.GetDuration("node.routes.ha.probe_timeout") + if haTimeout < 1*time.Second { + errorText += fmt.Sprintf( + "Fatal config error: node.routes.ha.probe_timeout (%s) must be >= 1s\n", + haTimeout, + ) + } + + if haTimeout >= haInterval { + errorText += fmt.Sprintf( + "Fatal config error: node.routes.ha.probe_timeout (%s) must be less than node.routes.ha.probe_interval (%s)\n", + haTimeout, + haInterval, + ) + } + } + // Validate tuning parameters if size := viper.GetInt("tuning.node_store_batch_size"); size <= 0 { errorText += fmt.Sprintf( @@ -1129,6 +1178,12 @@ func LoadServerConfig() (*Config, error) { Ephemeral: EphemeralConfig{ InactivityTimeout: resolveEphemeralInactivityTimeout(), }, + Routes: RouteConfig{ + HA: HARouteConfig{ + ProbeInterval: viper.GetDuration("node.routes.ha.probe_interval"), + ProbeTimeout: viper.GetDuration("node.routes.ha.probe_timeout"), + }, + }, }, Database: databaseConfig(),