mirror of
https://github.com/juanfont/headscale.git
synced 2026-05-04 19:46:12 +02:00
state: switch consumers to NodeStore primary routes
Replace routes.PrimaryRoutes reads with NodeStore. Connect bumps SessionEpoch; Disconnect re-checks it inside UpdateNode so the check and mutation are atomic against a concurrent Connect on the same node. The connect_race regression test is carried in its final SessionEpoch form. Updates #3203
This commit is contained in:
parent
da927eb018
commit
437754aeea
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/juanfont/headscale/hscontrol/routes"
|
||||
"github.com/juanfont/headscale/hscontrol/types"
|
||||
"tailscale.com/net/tsaddr"
|
||||
"tailscale.com/tailcfg"
|
||||
@ -209,25 +208,30 @@ func TestTailNode(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
primary := routes.New()
|
||||
cfg := &types.Config{
|
||||
BaseDomain: tt.baseDomain,
|
||||
TailcfgDNSConfig: tt.dnsConfig,
|
||||
RandomizeClientPort: false,
|
||||
Taildrop: types.TaildropConfig{Enabled: true},
|
||||
}
|
||||
_ = primary.SetRoutes(tt.node.ID, tt.node.SubnetRoutes()...)
|
||||
|
||||
// This is a hack to avoid having a second node to test the primary route.
|
||||
// This should be baked into the test case proper if it is extended in the future.
|
||||
_ = primary.SetRoutes(2, netip.MustParsePrefix("192.168.0.0/24"))
|
||||
// Stub primary-route lookup: tt.node owns its SubnetRoutes,
|
||||
// node ID 2 owns 192.168.0.0/24 (a hack carried over from
|
||||
// the original routes-package-driven version of this test —
|
||||
// avoids spinning up a second node just to validate that
|
||||
// other nodes' primaries don't leak into tt.node's TailNode
|
||||
// output).
|
||||
primaries := map[types.NodeID][]netip.Prefix{
|
||||
tt.node.ID: tt.node.SubnetRoutes(),
|
||||
2: {netip.MustParsePrefix("192.168.0.0/24")},
|
||||
}
|
||||
nv := tt.node.View()
|
||||
got, err := nv.TailNode(
|
||||
0,
|
||||
func(id types.NodeID) []netip.Prefix {
|
||||
// Route function returns primaries + exit routes
|
||||
// (matching the real caller contract).
|
||||
return slices.Concat(primary.PrimaryRoutes(id), nv.ExitRoutes())
|
||||
return slices.Concat(primaries[id], nv.ExitRoutes())
|
||||
},
|
||||
cfg,
|
||||
)
|
||||
|
||||
150
hscontrol/servertest/connect_race_test.go
Normal file
150
hscontrol/servertest/connect_race_test.go
Normal file
@ -0,0 +1,150 @@
|
||||
package servertest_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/juanfont/headscale/hscontrol/servertest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
|
||||
// TestConnectDisconnectRace targets the residual TOCTOU window in
|
||||
// state.Disconnect: the connectGeneration check at state.go:644 is not
|
||||
// atomic with the subsequent NodeStore.UpdateNode and
|
||||
// primaryRoutes.SetRoutes calls. A new Connect that runs between the
|
||||
// gen check and the mutations can have its effects overwritten by the
|
||||
// stale Disconnect's SetRoutes(empty).
|
||||
//
|
||||
// The poll.go grace-period flow protects against the most common case
|
||||
// (RemoveNode + stillConnected). Connect/Disconnect on State directly
|
||||
// bypasses that protection and should still leave the state consistent
|
||||
// — if it doesn't, that is the bug behind issue #3203.
|
||||
//
|
||||
// Run with -race to also catch any data race exposed.
|
||||
func TestConnectDisconnectRace(t *testing.T) {
|
||||
srv := servertest.NewServer(t)
|
||||
user := srv.CreateUser(t, "race-user")
|
||||
|
||||
route := netip.MustParsePrefix("10.0.0.0/24")
|
||||
|
||||
// Use NewClient to get a node fully registered + Connected via the
|
||||
// real noise/poll path. After this, NodeStore + primaryRoutes already
|
||||
// have the node, and Connect has been called once.
|
||||
//
|
||||
// Only c2 advertises the route. PrimaryRoutes preserves a current
|
||||
// primary across changes (anti-flap, see primary.go), so if both
|
||||
// nodes were advertising, c1 (lower NodeID) would stay primary and
|
||||
// the test could never observe the route slipping out of c2's
|
||||
// PrimaryRoutes — it would never have been there in the first place.
|
||||
c1 := servertest.NewClient(t, srv, "race-r1", servertest.WithUser(user))
|
||||
c2 := servertest.NewClient(t, srv, "race-r2", servertest.WithUser(user))
|
||||
|
||||
c1.WaitForPeers(t, 1, 10*time.Second)
|
||||
|
||||
c2.Direct().SetHostinfo(&tailcfg.Hostinfo{
|
||||
BackendLogID: "servertest-race-r2",
|
||||
Hostname: "race-r2",
|
||||
RoutableIPs: []netip.Prefix{route},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_ = c2.Direct().SendUpdate(ctx)
|
||||
|
||||
cancel()
|
||||
|
||||
r2ID := findNodeID(t, srv, "race-r2")
|
||||
|
||||
_, ch, err := srv.State().SetApprovedRoutes(r2ID, []netip.Prefix{route})
|
||||
require.NoError(t, err)
|
||||
srv.App.Change(ch)
|
||||
|
||||
// Wait for advertisement + approval to be reflected as a primary
|
||||
// route assignment in PrimaryRoutes; otherwise we'd be racing the
|
||||
// initial steady-state setup, not the Connect/Disconnect window.
|
||||
require.Eventually(t, func() bool {
|
||||
return slices.Contains(srv.State().GetNodePrimaryRoutes(r2ID), route)
|
||||
}, 10*time.Second, 50*time.Millisecond,
|
||||
"primary route should be assigned to r2 before driving the race")
|
||||
|
||||
// Drive the race repeatedly. Each iteration:
|
||||
// 1. Call Connect(id) to obtain a fresh gen — this stands in for
|
||||
// a session that "owns" the node.
|
||||
// 2. Spawn a goroutine that issues Disconnect(id, gen) — the
|
||||
// stale deferred disconnect.
|
||||
// 3. Concurrently spawn a goroutine that issues Connect(id) —
|
||||
// the new session arriving.
|
||||
// 4. After both finish, check the state is consistent: the node
|
||||
// should be online and primaryRoutes should hold the approved
|
||||
// route for it.
|
||||
//
|
||||
// The two goroutines synchronise on a barrier so they start
|
||||
// approximately simultaneously, maximising the chance of hitting the
|
||||
// TOCTOU window.
|
||||
const iterations = 100
|
||||
|
||||
for i := range iterations {
|
||||
// Establish a "current session" with a known gen for r2.
|
||||
_, gen := srv.State().Connect(r2ID)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
start := make(chan struct{})
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
<-start
|
||||
|
||||
_, _ = srv.State().Disconnect(r2ID, gen)
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
<-start
|
||||
|
||||
_, _ = srv.State().Connect(r2ID)
|
||||
}()
|
||||
|
||||
close(start)
|
||||
wg.Wait()
|
||||
|
||||
// Post-condition: the node should be ONLINE (the new Connect's
|
||||
// effect must dominate, because the stale Disconnect ran with
|
||||
// an older gen and should have been a no-op — or its effects
|
||||
// must not have overtaken the new Connect's writes).
|
||||
nv, ok := srv.State().GetNodeByID(r2ID)
|
||||
if !assert.True(t, ok, "iteration %d: node should exist", i) {
|
||||
continue
|
||||
}
|
||||
|
||||
online, known := nv.IsOnline().GetOk()
|
||||
if !assert.True(t, known, "iteration %d: online status should be known", i) {
|
||||
continue
|
||||
}
|
||||
|
||||
assert.True(t, online,
|
||||
"iteration %d: node should be ONLINE after concurrent Connect+Disconnect (gen=%d)",
|
||||
i, gen)
|
||||
|
||||
// The approved route must still be reflected as a primary for r2.
|
||||
primary := srv.State().GetNodePrimaryRoutes(r2ID)
|
||||
assert.True(t, slices.Contains(primary, route),
|
||||
"iteration %d: r2 should hold primary for %s after concurrent Connect+Disconnect, got %v",
|
||||
i, route, primary)
|
||||
|
||||
if t.Failed() {
|
||||
t.Logf("primaryRoutes state at failure:\n%s",
|
||||
srv.State().PrimaryRoutesString())
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -72,7 +72,7 @@ func TestHAFailover_ViewerSeesPrimaryFlip(t *testing.T) {
|
||||
return hasPeerPrimaryRoute(nm, "dyn-flip-r1", route)
|
||||
})
|
||||
|
||||
changed := srv.State().PrimaryRoutes().SetNodeHealthy(id1, false)
|
||||
changed := srv.State().SetNodeUnhealthy(id1, true)
|
||||
require.True(t, changed, "marking primary unhealthy should change primaries")
|
||||
|
||||
srv.App.Change(change.PolicyChange())
|
||||
|
||||
@ -80,7 +80,7 @@ func TestHAHealthProbe_HealthyNodes(t *testing.T) {
|
||||
prober.ProbeOnce(ctx, srv.App.Change)
|
||||
|
||||
// Both nodes should be healthy, primary unchanged (node 1).
|
||||
assert.True(t, srv.State().PrimaryRoutes().IsNodeHealthy(nodeID1))
|
||||
assert.True(t, srv.State().IsNodeHealthy(nodeID1))
|
||||
|
||||
primaries := srv.State().GetNodePrimaryRoutes(nodeID1)
|
||||
assert.Contains(t, primaries, route)
|
||||
@ -111,7 +111,7 @@ func TestHAHealthProbe_UnhealthyFailover(t *testing.T) {
|
||||
require.Contains(t, primaries, route, "node 1 should be primary initially")
|
||||
|
||||
// Mark node 1 unhealthy — should failover to node 2.
|
||||
changed := srv.State().PrimaryRoutes().SetNodeHealthy(nodeID1, false)
|
||||
changed := srv.State().SetNodeUnhealthy(nodeID1, true)
|
||||
assert.True(t, changed, "marking primary unhealthy should change primaries")
|
||||
|
||||
primaries2 := srv.State().GetNodePrimaryRoutes(nodeID2)
|
||||
@ -141,12 +141,12 @@ func TestHAHealthProbe_RecoveryNoFlap(t *testing.T) {
|
||||
nodeID2 := advertiseAndApproveRoute(t, srv, c2, route)
|
||||
|
||||
// Failover: node 1 → node 2.
|
||||
srv.State().PrimaryRoutes().SetNodeHealthy(nodeID1, false)
|
||||
srv.State().SetNodeUnhealthy(nodeID1, true)
|
||||
primaries := srv.State().GetNodePrimaryRoutes(nodeID2)
|
||||
require.Contains(t, primaries, route, "node 2 should be primary")
|
||||
|
||||
// Recovery: node 1 healthy again. Node 2 should STAY primary.
|
||||
changed := srv.State().PrimaryRoutes().SetNodeHealthy(nodeID1, true)
|
||||
changed := srv.State().SetNodeUnhealthy(nodeID1, false)
|
||||
assert.False(t, changed, "recovery should not change primaries (no flap)")
|
||||
|
||||
primaries = srv.State().GetNodePrimaryRoutes(nodeID2)
|
||||
@ -173,8 +173,8 @@ func TestHAHealthProbe_ConnectClearsUnhealthy(t *testing.T) {
|
||||
advertiseAndApproveRoute(t, srv, c2, route)
|
||||
|
||||
// Mark unhealthy.
|
||||
srv.State().PrimaryRoutes().SetNodeHealthy(nodeID1, false)
|
||||
assert.False(t, srv.State().PrimaryRoutes().IsNodeHealthy(nodeID1))
|
||||
srv.State().SetNodeUnhealthy(nodeID1, true)
|
||||
assert.False(t, srv.State().IsNodeHealthy(nodeID1))
|
||||
|
||||
// Reconnect clears unhealthy via State.Connect → ClearUnhealthy.
|
||||
c1.Disconnect(t)
|
||||
@ -182,7 +182,7 @@ func TestHAHealthProbe_ConnectClearsUnhealthy(t *testing.T) {
|
||||
|
||||
c1.WaitForPeers(t, 1, 10*time.Second)
|
||||
|
||||
assert.True(t, srv.State().PrimaryRoutes().IsNodeHealthy(nodeID1),
|
||||
assert.True(t, srv.State().IsNodeHealthy(nodeID1),
|
||||
"reconnect should clear unhealthy state")
|
||||
}
|
||||
|
||||
|
||||
@ -2,11 +2,12 @@ package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
hsdb "github.com/juanfont/headscale/hscontrol/db"
|
||||
"github.com/juanfont/headscale/hscontrol/routes"
|
||||
"github.com/juanfont/headscale/hscontrol/types"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
@ -132,8 +133,10 @@ func (s *State) DebugOverview() string {
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Route information
|
||||
routeCount := len(strings.Split(strings.TrimSpace(s.primaryRoutes.String()), "\n"))
|
||||
if s.primaryRoutes.String() == "" {
|
||||
primaryStr := s.PrimaryRoutesString()
|
||||
|
||||
routeCount := len(strings.Split(strings.TrimSpace(primaryStr), "\n"))
|
||||
if primaryStr == "" {
|
||||
routeCount = 0
|
||||
}
|
||||
|
||||
@ -253,9 +256,55 @@ func (s *State) DebugFilter() ([]tailcfg.FilterRule, error) {
|
||||
return filter, nil
|
||||
}
|
||||
|
||||
// DebugRoutes returns the current primary routes information as a structured object.
|
||||
func (s *State) DebugRoutes() routes.DebugRoutes {
|
||||
return s.primaryRoutes.DebugJSON()
|
||||
// DebugRoutes returns the current primary routes information as a
|
||||
// structured object built from the NodeStore snapshot.
|
||||
func (s *State) DebugRoutes() types.DebugRoutes {
|
||||
debug := types.DebugRoutes{
|
||||
AvailableRoutes: make(map[types.NodeID][]netip.Prefix),
|
||||
PrimaryRoutes: make(map[string]types.NodeID),
|
||||
}
|
||||
|
||||
for _, nv := range s.nodeStore.ListNodes().All() {
|
||||
if !nv.Valid() {
|
||||
continue
|
||||
}
|
||||
|
||||
online, known := nv.IsOnline().GetOk()
|
||||
if !known || !online {
|
||||
continue
|
||||
}
|
||||
|
||||
approved := nv.AllApprovedRoutes()
|
||||
if len(approved) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
slices.SortFunc(approved, netip.Prefix.Compare)
|
||||
debug.AvailableRoutes[nv.ID()] = approved
|
||||
}
|
||||
|
||||
for prefix, id := range s.nodeStore.PrimaryRoutes() {
|
||||
debug.PrimaryRoutes[prefix.String()] = id
|
||||
}
|
||||
|
||||
var unhealthy []types.NodeID
|
||||
|
||||
for _, nv := range s.nodeStore.ListNodes().All() {
|
||||
if !nv.Valid() {
|
||||
continue
|
||||
}
|
||||
|
||||
if !s.nodeStore.IsNodeHealthy(nv.ID()) {
|
||||
unhealthy = append(unhealthy, nv.ID())
|
||||
}
|
||||
}
|
||||
|
||||
if len(unhealthy) > 0 {
|
||||
slices.Sort(unhealthy)
|
||||
debug.UnhealthyNodes = unhealthy
|
||||
}
|
||||
|
||||
return debug
|
||||
}
|
||||
|
||||
// DebugRoutesString returns the current primary routes information as a string.
|
||||
@ -322,8 +371,10 @@ func (s *State) DebugOverviewJSON() DebugOverviewInfo {
|
||||
}
|
||||
|
||||
// Route information
|
||||
routeCount := len(strings.Split(strings.TrimSpace(s.primaryRoutes.String()), "\n"))
|
||||
if s.primaryRoutes.String() == "" {
|
||||
primaryStr := s.PrimaryRoutesString()
|
||||
|
||||
routeCount := len(strings.Split(strings.TrimSpace(primaryStr), "\n"))
|
||||
if primaryStr == "" {
|
||||
routeCount = 0
|
||||
}
|
||||
|
||||
|
||||
@ -46,7 +46,7 @@ func (p *HAHealthProber) ProbeOnce(
|
||||
ctx context.Context,
|
||||
dispatch func(...change.Change),
|
||||
) {
|
||||
haNodes := p.state.primaryRoutes.HANodes()
|
||||
haNodes := p.state.nodeStore.HANodes()
|
||||
if len(haNodes) == 0 {
|
||||
return
|
||||
}
|
||||
@ -97,7 +97,7 @@ func (p *HAHealthProber) ProbeOnce(
|
||||
Dur("latency", latency).
|
||||
Msg("HA probe: node responded")
|
||||
|
||||
if p.state.primaryRoutes.SetNodeHealthy(id, true) {
|
||||
if p.state.SetNodeUnhealthy(id, false) {
|
||||
dispatch(change.PolicyChange())
|
||||
|
||||
log.Info().
|
||||
@ -121,7 +121,7 @@ func (p *HAHealthProber) ProbeOnce(
|
||||
Dur("timeout", p.cfg.ProbeTimeout).
|
||||
Msg("HA probe: node did not respond")
|
||||
|
||||
if p.state.primaryRoutes.SetNodeHealthy(id, false) {
|
||||
if p.state.SetNodeUnhealthy(id, true) {
|
||||
dispatch(change.PolicyChange())
|
||||
|
||||
log.Info().
|
||||
|
||||
@ -916,6 +916,36 @@ func (s *NodeStore) IsNodeHealthy(id types.NodeID) bool {
|
||||
return !n.Unhealthy
|
||||
}
|
||||
|
||||
// PrimaryRoutes returns the snapshot's prefix→primary map. The map is
|
||||
// owned by the snapshot and must not be mutated; it is safe to read
|
||||
// concurrently because snapshots are immutable once published.
|
||||
func (s *NodeStore) PrimaryRoutes() map[netip.Prefix]types.NodeID {
|
||||
return s.data.Load().routes
|
||||
}
|
||||
|
||||
// PrimaryRoutesString renders the snapshot's prefix→primary map for
|
||||
// debug output and test diagnostics.
|
||||
func (s *NodeStore) PrimaryRoutesString() string {
|
||||
snap := s.data.Load()
|
||||
if len(snap.routes) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
prefixes := make([]netip.Prefix, 0, len(snap.routes))
|
||||
for p := range snap.routes {
|
||||
prefixes = append(prefixes, p)
|
||||
}
|
||||
|
||||
slices.SortFunc(prefixes, netip.Prefix.Compare)
|
||||
|
||||
var b strings.Builder
|
||||
for _, p := range prefixes {
|
||||
fmt.Fprintf(&b, "%s: %d\n", p, snap.routes[p])
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// RebuildPeerMaps rebuilds the peer relationship map using the current peersFunc.
|
||||
// This must be called after policy changes because peersFunc uses PolicyManager's
|
||||
// filters to determine which nodes can see each other. Without rebuilding, the
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strconv"
|
||||
@ -27,7 +28,6 @@ import (
|
||||
hsdb "github.com/juanfont/headscale/hscontrol/db"
|
||||
"github.com/juanfont/headscale/hscontrol/policy"
|
||||
"github.com/juanfont/headscale/hscontrol/policy/matcher"
|
||||
"github.com/juanfont/headscale/hscontrol/routes"
|
||||
"github.com/juanfont/headscale/hscontrol/types"
|
||||
"github.com/juanfont/headscale/hscontrol/types/change"
|
||||
"github.com/juanfont/headscale/hscontrol/util"
|
||||
@ -146,15 +146,6 @@ type State struct {
|
||||
// via the eviction callback so any waiting goroutines wake.
|
||||
authCache *expirable.LRU[types.AuthID, *types.AuthRequest]
|
||||
|
||||
// primaryRoutes tracks primary route assignments for nodes
|
||||
primaryRoutes *routes.PrimaryRoutes
|
||||
|
||||
// connectGen tracks a per-node monotonic generation counter so stale
|
||||
// Disconnect() calls from old poll sessions are rejected. Connect()
|
||||
// increments the counter and returns the current value; Disconnect()
|
||||
// only proceeds when the generation it carries matches the latest.
|
||||
connectGen sync.Map // types.NodeID → *atomic.Uint64
|
||||
|
||||
// pings tracks pending ping requests and their response channels.
|
||||
pings *pingTracker
|
||||
|
||||
@ -262,13 +253,12 @@ func NewState(cfg *types.Config) (*State, error) {
|
||||
return &State{
|
||||
cfg: cfg,
|
||||
|
||||
db: db,
|
||||
ipAlloc: ipAlloc,
|
||||
polMan: polMan,
|
||||
authCache: authCache,
|
||||
primaryRoutes: routes.New(),
|
||||
nodeStore: nodeStore,
|
||||
pings: newPingTracker(),
|
||||
db: db,
|
||||
ipAlloc: ipAlloc,
|
||||
polMan: polMan,
|
||||
authCache: authCache,
|
||||
nodeStore: nodeStore,
|
||||
pings: newPingTracker(),
|
||||
|
||||
sshCheckAuth: make(map[sshCheckPair]time.Time),
|
||||
}, nil
|
||||
@ -559,101 +549,55 @@ func (s *State) DeleteNode(node types.NodeView) (change.Change, error) {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Connect marks a node as connected and updates its primary routes in the state.
|
||||
// It returns the list of changes and a generation number. The generation number
|
||||
// must be passed to Disconnect() so that stale disconnects from old poll sessions
|
||||
// are rejected (see the grace period logic in poll.go).
|
||||
// Connect marks a node connected and returns the resulting changes
|
||||
// plus a session epoch. The caller must pass the epoch back to
|
||||
// Disconnect so deferred grace-period disconnects from a previous
|
||||
// poll session are dropped (see poll.go).
|
||||
func (s *State) Connect(id types.NodeID) ([]change.Change, uint64) {
|
||||
// Increment the connect generation for this node. This ensures that any
|
||||
// in-flight Disconnect() from a previous session will see a stale generation
|
||||
// and become a no-op.
|
||||
gen := s.nextConnectGen(id)
|
||||
prevRoutes := s.nodeStore.PrimaryRoutes()
|
||||
|
||||
// Update online status in NodeStore before creating change notification
|
||||
// so the NodeStore already reflects the correct state when other nodes
|
||||
// process the NodeCameOnline change for full map generation.
|
||||
// Reconnecting clears Unhealthy: the node just proved basic
|
||||
// connectivity by completing the Noise handshake.
|
||||
var epoch uint64
|
||||
node, ok := s.nodeStore.UpdateNode(id, func(n *types.Node) {
|
||||
n.SessionEpoch++
|
||||
epoch = n.SessionEpoch
|
||||
n.IsOnline = new(true)
|
||||
// n.LastSeen = ptr.To(now)
|
||||
n.Unhealthy = false
|
||||
})
|
||||
if !ok {
|
||||
return nil, gen
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
c := []change.Change{change.NodeOnlineFor(node)}
|
||||
|
||||
log.Info().EmbedObject(node).Msg("node connected")
|
||||
|
||||
// Reconnecting clears any prior unhealthy state — the node proved
|
||||
// basic connectivity by establishing the Noise session.
|
||||
s.primaryRoutes.ClearUnhealthy(id)
|
||||
|
||||
// Use the node's current routes for primary route update.
|
||||
// AllApprovedRoutes() returns only the intersection of announced and approved routes.
|
||||
routeChange := s.primaryRoutes.SetRoutes(id, node.AllApprovedRoutes()...)
|
||||
|
||||
if routeChange {
|
||||
if !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes()) {
|
||||
c = append(c, change.NodeAdded(id))
|
||||
}
|
||||
|
||||
// Mirror Disconnect: a node coming online may (re)enable cap/relay
|
||||
// grants targeting it, reintroduce identity-based aliases that
|
||||
// resolve to its tags/IPs, and so on. Always trigger a PolicyChange
|
||||
// so peers can recompute their netmap and pick up any policy
|
||||
// elements that depend on this node being present.
|
||||
// Coming online may re-enable cap/relay grants and identity-based
|
||||
// aliases targeting this node, so peers need a fresh netmap.
|
||||
c = append(c, change.PolicyChange())
|
||||
|
||||
return c, gen
|
||||
return c, epoch
|
||||
}
|
||||
|
||||
// nextConnectGen atomically increments and returns the connect generation for a node.
|
||||
func (s *State) nextConnectGen(id types.NodeID) uint64 {
|
||||
val, _ := s.connectGen.LoadOrStore(id, &atomic.Uint64{})
|
||||
|
||||
counter, ok := val.(*atomic.Uint64)
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
return counter.Add(1)
|
||||
}
|
||||
|
||||
// connectGeneration returns the current connect generation for a node.
|
||||
func (s *State) connectGeneration(id types.NodeID) uint64 {
|
||||
val, ok := s.connectGen.Load(id)
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
counter, ok := val.(*atomic.Uint64)
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
return counter.Load()
|
||||
}
|
||||
|
||||
// Disconnect marks a node as disconnected and updates its primary routes in the state.
|
||||
// The gen parameter is the generation returned by Connect(). If a newer Connect() has
|
||||
// been called since the session that is disconnecting, the generation will not match
|
||||
// and this call becomes a no-op, preventing stale disconnects from overwriting the
|
||||
// online status set by a newer session.
|
||||
func (s *State) Disconnect(id types.NodeID, gen uint64) ([]change.Change, error) {
|
||||
// Check if this disconnect is stale. A newer Connect() will have incremented
|
||||
// the generation, so if ours doesn't match, a newer session owns this node.
|
||||
if current := s.connectGeneration(id); current != gen {
|
||||
log.Debug().
|
||||
Uint64("disconnect_gen", gen).
|
||||
Uint64("current_gen", current).
|
||||
Msg("stale disconnect rejected, newer session active")
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Disconnect marks the node offline. epoch must match the value
|
||||
// Connect returned for this session; otherwise the call no-ops so a
|
||||
// deferred disconnect from an older session cannot overwrite state set
|
||||
// by a newer Connect. The check and the IsOnline write share an
|
||||
// UpdateNode closure, making them atomic against concurrent Connects.
|
||||
func (s *State) Disconnect(id types.NodeID, epoch uint64) ([]change.Change, error) {
|
||||
var stale bool
|
||||
node, ok := s.nodeStore.UpdateNode(id, func(n *types.Node) {
|
||||
if n.SessionEpoch != epoch {
|
||||
stale = true
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
n.LastSeen = &now
|
||||
// NodeStore is the source of truth for all node state including online status.
|
||||
n.IsOnline = new(false)
|
||||
})
|
||||
|
||||
@ -661,29 +605,29 @@ func (s *State) Disconnect(id types.NodeID, gen uint64) ([]change.Change, error)
|
||||
return nil, fmt.Errorf("%w: %d", ErrNodeNotFound, id)
|
||||
}
|
||||
|
||||
if stale {
|
||||
log.Debug().
|
||||
Uint64("disconnect_epoch", epoch).
|
||||
Uint64("current_epoch", node.SessionEpoch()).
|
||||
Msg("stale disconnect rejected, newer session active")
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Info().EmbedObject(node).Msg("node disconnected")
|
||||
|
||||
// Special error handling for disconnect - we log errors but continue
|
||||
// because NodeStore is already updated and we need to notify peers
|
||||
// Persist LastSeen best-effort: NodeStore already reflects offline
|
||||
// and peers still need the change notifications below.
|
||||
_, c, err := s.persistNodeToDB(node)
|
||||
if err != nil {
|
||||
// Log error but don't fail the disconnection - NodeStore is already updated
|
||||
// and we need to send change notifications to peers
|
||||
log.Error().Err(err).EmbedObject(node).Msg("failed to update last seen in database")
|
||||
|
||||
c = change.Change{}
|
||||
}
|
||||
|
||||
// The node is disconnecting so make sure that none of the routes it
|
||||
// announced are served to any nodes.
|
||||
s.primaryRoutes.SetRoutes(id)
|
||||
|
||||
// A node going offline can affect policy compilation in ways beyond
|
||||
// subnet routes: cap/relay grants targeting this node, identity-based
|
||||
// aliases (tags, groups, users) that reference its tags/IPs, via
|
||||
// routes steered through it, and so on. Always trigger a PolicyChange
|
||||
// so peers receive a recomputed netmap and drop any cached state
|
||||
// derived from this node (including peer relay allocations).
|
||||
// Going offline can affect policy compilation beyond subnet routes
|
||||
// (cap/relay grants, tag/group aliases, via routes), so peers need
|
||||
// a fresh netmap regardless of whether the primary moved.
|
||||
//
|
||||
// TODO(kradalby): fires one full netmap recompute per peer on
|
||||
// every connect/disconnect. Coalesce in mapper/batcher.go:addToBatch.
|
||||
@ -951,6 +895,8 @@ func (s *State) SetApprovedRoutes(nodeID types.NodeID, routes []netip.Prefix) (t
|
||||
// TODO(kradalby): In principle we should call the AutoApprove logic here
|
||||
// because even if the CLI removes an auto-approved route, it will be added
|
||||
// back automatically.
|
||||
prevRoutes := s.nodeStore.PrimaryRoutes()
|
||||
|
||||
n, ok := s.nodeStore.UpdateNode(nodeID, func(node *types.Node) {
|
||||
node.ApprovedRoutes = routes
|
||||
})
|
||||
@ -965,13 +911,9 @@ func (s *State) SetApprovedRoutes(nodeID types.NodeID, routes []netip.Prefix) (t
|
||||
return types.NodeView{}, change.Change{}, err
|
||||
}
|
||||
|
||||
// Update primary routes table based on SubnetRoutes (intersection of announced and approved).
|
||||
// The primary routes table is what the mapper uses to generate network maps, so updating it
|
||||
// here ensures that route changes are distributed to peers.
|
||||
routeChange := s.primaryRoutes.SetRoutes(nodeID, nodeView.AllApprovedRoutes()...)
|
||||
|
||||
// If routes changed or the changeset isn't already a full update, trigger a policy change
|
||||
// to ensure all nodes get updated network maps
|
||||
// PolicyChange fans out a fresh netmap whenever the new approved
|
||||
// set shifted a primary advertiser.
|
||||
routeChange := !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes())
|
||||
if routeChange || !c.IsFull() {
|
||||
c = change.PolicyChange()
|
||||
}
|
||||
@ -1157,20 +1099,9 @@ func (s *State) SetPolicyInDB(data string) (*types.Policy, error) {
|
||||
return s.db.SetPolicy(data)
|
||||
}
|
||||
|
||||
// SetNodeRoutes sets the primary routes for a node.
|
||||
func (s *State) SetNodeRoutes(nodeID types.NodeID, routes ...netip.Prefix) change.Change {
|
||||
if s.primaryRoutes.SetRoutes(nodeID, routes...) {
|
||||
// Route changes affect packet filters for all nodes, so trigger a policy change
|
||||
// to ensure filters are regenerated across the entire network
|
||||
return change.PolicyChange()
|
||||
}
|
||||
|
||||
return change.Change{}
|
||||
}
|
||||
|
||||
// GetNodePrimaryRoutes returns the primary routes for a node.
|
||||
func (s *State) GetNodePrimaryRoutes(nodeID types.NodeID) []netip.Prefix {
|
||||
return s.primaryRoutes.PrimaryRoutes(nodeID)
|
||||
return s.nodeStore.PrimaryRoutesForNode(nodeID)
|
||||
}
|
||||
|
||||
// RoutesForPeer computes the routes a peer should advertise in a viewer's
|
||||
@ -1185,7 +1116,7 @@ func (s *State) RoutesForPeer(
|
||||
matchers []matcher.Match,
|
||||
) []netip.Prefix {
|
||||
viaResult := s.polMan.ViaRoutesForPeer(viewer, peer)
|
||||
globalPrimaries := s.primaryRoutes.PrimaryRoutes(peer.ID())
|
||||
globalPrimaries := s.nodeStore.PrimaryRoutesForNode(peer.ID())
|
||||
exitRoutes := peer.ExitRoutes()
|
||||
|
||||
var reduced []netip.Prefix
|
||||
@ -1245,14 +1176,47 @@ func (s *State) RoutesForPeer(
|
||||
return reduced
|
||||
}
|
||||
|
||||
// PrimaryRoutes returns the primary routes tracker.
|
||||
func (s *State) PrimaryRoutes() *routes.PrimaryRoutes {
|
||||
return s.primaryRoutes
|
||||
// PrimaryRoutesString renders the current prefix→primary assignment
|
||||
// for diagnostics.
|
||||
func (s *State) PrimaryRoutesString() string {
|
||||
return s.nodeStore.PrimaryRoutesString()
|
||||
}
|
||||
|
||||
// PrimaryRoutesString returns a string representation of all primary routes.
|
||||
func (s *State) PrimaryRoutesString() string {
|
||||
return s.primaryRoutes.String()
|
||||
// IsNodeHealthy reports the HA prober's view of id. Unknown nodes
|
||||
// report healthy.
|
||||
func (s *State) IsNodeHealthy(id types.NodeID) bool {
|
||||
return s.nodeStore.IsNodeHealthy(id)
|
||||
}
|
||||
|
||||
// SetNodeUnhealthy flips the runtime Unhealthy bit and reports whether
|
||||
// the resulting primary route assignment changed, so the HA prober can
|
||||
// decide whether to fan out a PolicyChange.
|
||||
//
|
||||
// A request to mark a node Unhealthy is dropped if the node is no
|
||||
// longer an HA candidate (offline or no approved routes). The prober
|
||||
// reads HANodes() at the start of a probe cycle and writes back after
|
||||
// the timeout fires; in that window the node may have left the
|
||||
// candidate set, and the bit would just be stale. The check happens
|
||||
// inside the writer goroutine so it serialises against the
|
||||
// SetApprovedRoutes / Disconnect that removed candidacy.
|
||||
func (s *State) SetNodeUnhealthy(id types.NodeID, unhealthy bool) bool {
|
||||
prevRoutes := s.nodeStore.PrimaryRoutes()
|
||||
|
||||
_, ok := s.nodeStore.UpdateNode(id, func(n *types.Node) {
|
||||
if unhealthy {
|
||||
online := n.IsOnline != nil && *n.IsOnline
|
||||
if !online || len(n.AllApprovedRoutes()) == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n.Unhealthy = unhealthy
|
||||
})
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes())
|
||||
}
|
||||
|
||||
// ValidateAPIKey checks if an API key is valid and active.
|
||||
@ -2487,6 +2451,10 @@ func (s *State) UpdateNodeFromMapRequest(id types.NodeID, req tailcfg.MapRequest
|
||||
endpointChanged bool
|
||||
derpChanged bool
|
||||
)
|
||||
// Snapshot the primary assignment so we can tell whether the
|
||||
// Hostinfo + auto-approval that follows shifted any prefix.
|
||||
prevRoutes := s.nodeStore.PrimaryRoutes()
|
||||
|
||||
// We need to ensure we update the node as it is in the NodeStore at
|
||||
// the time of the request.
|
||||
updatedNode, ok := s.nodeStore.UpdateNode(id, func(currentNode *types.Node) {
|
||||
@ -2618,10 +2586,22 @@ func (s *State) UpdateNodeFromMapRequest(id types.NodeID, req tailcfg.MapRequest
|
||||
}
|
||||
} // Continue with the rest of the processing using the updated node
|
||||
|
||||
// Handle route changes after NodeStore update.
|
||||
// Update routes if announced routes changed (even if approved routes stayed the same)
|
||||
// because SubnetRoutes is the intersection of announced AND approved routes.
|
||||
nodeRouteChange := s.maybeUpdateNodeRoutes(id, updatedNode, hostinfoChanged, needsRouteApproval, routeChange, req.Hostinfo)
|
||||
// SubnetRoutes = announced ∩ approved, so a Hostinfo update can
|
||||
// move a primary without ever touching ApprovedRoutes. The pre/post
|
||||
// snapshot diff catches that.
|
||||
nodeRouteChange := change.Change{}
|
||||
|
||||
if !maps.Equal(prevRoutes, s.nodeStore.PrimaryRoutes()) {
|
||||
log.Debug().
|
||||
Caller().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Strs(zf.RoutesAnnounced, util.PrefixesToString(updatedNode.AnnouncedRoutes())).
|
||||
Strs(zf.ApprovedRoutes, util.PrefixesToString(updatedNode.ApprovedRoutes().AsSlice())).
|
||||
Strs(zf.AllApprovedRoutes, util.PrefixesToString(updatedNode.AllApprovedRoutes())).
|
||||
Msg("primary route assignment shifted after MapRequest")
|
||||
|
||||
nodeRouteChange = change.PolicyChange()
|
||||
}
|
||||
|
||||
_, policyChange, err := s.persistNodeToDB(updatedNode)
|
||||
if err != nil {
|
||||
@ -2715,34 +2695,3 @@ func peerChangeEmpty(peerChange tailcfg.PeerChange) bool {
|
||||
peerChange.LastSeen == nil &&
|
||||
peerChange.KeyExpiry == nil
|
||||
}
|
||||
|
||||
// maybeUpdateNodeRoutes updates node routes if announced routes changed but approved routes didn't.
|
||||
// This is needed because SubnetRoutes is the intersection of announced AND approved routes.
|
||||
func (s *State) maybeUpdateNodeRoutes(
|
||||
id types.NodeID,
|
||||
node types.NodeView,
|
||||
hostinfoChanged, needsRouteApproval, routeChange bool,
|
||||
hostinfo *tailcfg.Hostinfo,
|
||||
) change.Change {
|
||||
// Only update if announced routes changed without approval change
|
||||
if !hostinfoChanged || !needsRouteApproval || routeChange || hostinfo == nil {
|
||||
return change.Change{}
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Caller().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Msg("updating routes because announced routes changed but approved routes did not")
|
||||
|
||||
// SetNodeRoutes sets the active/distributed routes using AllApprovedRoutes()
|
||||
// which returns only the intersection of announced AND approved routes.
|
||||
log.Debug().
|
||||
Caller().
|
||||
Uint64(zf.NodeID, id.Uint64()).
|
||||
Strs(zf.RoutesAnnounced, util.PrefixesToString(node.AnnouncedRoutes())).
|
||||
Strs(zf.ApprovedRoutes, util.PrefixesToString(node.ApprovedRoutes().AsSlice())).
|
||||
Strs(zf.AllApprovedRoutes, util.PrefixesToString(node.AllApprovedRoutes())).
|
||||
Msg("updating node routes for distribution")
|
||||
|
||||
return s.SetNodeRoutes(id, node.AllApprovedRoutes()...)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user