From 63f7a400a8fbe89eaa9b2ba559a4300df842fcc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20Lensb=C3=B8l?= Date: Tue, 7 Oct 2025 09:30:27 -0400 Subject: [PATCH] wgengine/{magicsock,userspace,router}: move portupdates to the eventbus (#17423) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Also pull out interface method only needed in Linux. Instead of having userspace do the call into the router, just let the router pick up the change itself. Updates #15160 Signed-off-by: Claus Lensbøl --- wgengine/magicsock/magicsock.go | 19 ++- wgengine/router/callback.go | 7 - wgengine/router/osrouter/router_linux.go | 122 ++++++++++-------- wgengine/router/osrouter/router_openbsd.go | 7 - wgengine/router/osrouter/router_plan9.go | 7 - .../router/osrouter/router_userspace_bsd.go | 7 - wgengine/router/osrouter/router_windows.go | 7 - wgengine/router/router.go | 16 +-- wgengine/router/router_fake.go | 5 - wgengine/userspace.go | 8 -- 10 files changed, 83 insertions(+), 122 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 112085053..c7d07c277 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -67,6 +67,7 @@ import ( "tailscale.com/util/testenv" "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" + "tailscale.com/wgengine/router" "tailscale.com/wgengine/wgint" ) @@ -179,6 +180,7 @@ type Conn struct { // config changes between magicsock and wireguard. syncPub *eventbus.Publisher[syncPoint] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] + portUpdatePub *eventbus.Publisher[router.PortUpdate] // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock @@ -393,10 +395,6 @@ type Conn struct { // wgPinger is the WireGuard only pinger used for latency measurements. wgPinger lazy.SyncValue[*ping.Pinger] - // onPortUpdate is called with the new port when magicsock rebinds to - // a new port. - onPortUpdate func(port uint16, network string) - // getPeerByKey optionally specifies a function to look up a peer's // wireguard state by its public key. If nil, it's not used. getPeerByKey func(key.NodePublic) (_ wgint.Peer, ok bool) @@ -492,10 +490,6 @@ type Options struct { // If nil, they're ignored and not updated. ControlKnobs *controlknobs.Knobs - // OnPortUpdate is called with the new port when magicsock rebinds to - // a new port. - OnPortUpdate func(port uint16, network string) - // PeerByKeyFunc optionally specifies a function to look up a peer's // WireGuard state by its public key. If nil, it's not used. // In regular use, this will be wgengine.(*userspaceEngine).PeerByKey. @@ -735,6 +729,7 @@ func NewConn(opts Options) (*Conn, error) { cli := c.eventBus.Client("magicsock.Conn") c.syncPub = eventbus.Publish[syncPoint](cli) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) + c.portUpdatePub = eventbus.Publish[router.PortUpdate](cli) c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) @@ -759,7 +754,6 @@ func NewConn(opts Options) (*Conn, error) { c.netMon = opts.NetMon c.health = opts.HealthTracker - c.onPortUpdate = opts.OnPortUpdate c.getPeerByKey = opts.PeerByKeyFunc if err := c.rebind(keepCurrentPort); err != nil { @@ -3533,7 +3527,7 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur c.logf("magicsock: unable to bind %v port %d: %v", network, port, err) continue } - if c.onPortUpdate != nil { + if c.portUpdatePub.ShouldPublish() { _, gotPortStr, err := net.SplitHostPort(pconn.LocalAddr().String()) if err != nil { c.logf("could not parse port from %s: %w", pconn.LocalAddr().String(), err) @@ -3542,7 +3536,10 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur if err != nil { c.logf("could not parse port from %s: %w", gotPort, err) } else { - c.onPortUpdate(uint16(gotPort), network) + c.portUpdatePub.Publish(router.PortUpdate{ + UDPPort: uint16(gotPort), + EndpointNetwork: network, + }) } } } diff --git a/wgengine/router/callback.go b/wgengine/router/callback.go index 1d9091277..c1838539b 100644 --- a/wgengine/router/callback.go +++ b/wgengine/router/callback.go @@ -56,13 +56,6 @@ func (r *CallbackRouter) Set(rcfg *Config) error { return r.SetBoth(r.rcfg, r.dcfg) } -// UpdateMagicsockPort implements the Router interface. This implementation -// does nothing and returns nil because this router does not currently need -// to know what the magicsock UDP port is. -func (r *CallbackRouter) UpdateMagicsockPort(_ uint16, _ string) error { - return nil -} - // SetDNS implements dns.OSConfigurator. func (r *CallbackRouter) SetDNS(dcfg dns.OSConfig) error { r.mu.Lock() diff --git a/wgengine/router/osrouter/router_linux.go b/wgengine/router/osrouter/router_linux.go index cf1a9f027..835a9050f 100644 --- a/wgengine/router/osrouter/router_linux.go +++ b/wgengine/router/osrouter/router_linux.go @@ -14,6 +14,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -54,21 +55,14 @@ const ( ) type linuxRouter struct { - closed atomic.Bool - logf func(fmt string, args ...any) - tunname string - netMon *netmon.Monitor - health *health.Tracker - eventSubs eventbus.Monitor - rulesAddedPub *eventbus.Publisher[AddIPRules] - unregNetMon func() - addrs map[netip.Prefix]bool - routes map[netip.Prefix]bool - localRoutes map[netip.Prefix]bool - snatSubnetRoutes bool - statefulFiltering bool - netfilterMode preftype.NetfilterMode - netfilterKind string + closed atomic.Bool + logf func(fmt string, args ...any) + tunname string + netMon *netmon.Monitor + health *health.Tracker + eventSubs eventbus.Monitor + rulesAddedPub *eventbus.Publisher[AddIPRules] + unregNetMon func() // ruleRestorePending is whether a timer has been started to // restore deleted ip rules. @@ -86,8 +80,16 @@ type linuxRouter struct { cmd commandRunner nfr linuxfw.NetfilterRunner - magicsockPortV4 atomic.Uint32 // actually a uint16 - magicsockPortV6 atomic.Uint32 // actually a uint16 + mu sync.Mutex + addrs map[netip.Prefix]bool + routes map[netip.Prefix]bool + localRoutes map[netip.Prefix]bool + snatSubnetRoutes bool + statefulFiltering bool + netfilterMode preftype.NetfilterMode + netfilterKind string + magicsockPortV4 uint16 + magicsockPortV6 uint16 } func newUserspaceRouter(logf logger.Logf, tunDev tun.Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus) (router.Router, error) { @@ -169,6 +171,7 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon // [eventbus.Client] is closed. func (r *linuxRouter) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) { ruleDeletedSub := eventbus.Subscribe[netmon.RuleDeleted](ec) + portUpdateSub := eventbus.Subscribe[router.PortUpdate](ec) return func(ec *eventbus.Client) { for { select { @@ -176,6 +179,11 @@ func (r *linuxRouter) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus. return case rs := <-ruleDeletedSub.Events(): r.onIPRuleDeleted(rs.Table, rs.Priority) + case pu := <-portUpdateSub.Events(): + r.logf("portUpdate(port=%v, network=%s)", pu.UDPPort, pu.EndpointNetwork) + if err := r.updateMagicsockPort(pu.UDPPort, pu.EndpointNetwork); err != nil { + r.logf("updateMagicsockPort(port=%v, network=%s) failed: %v", pu.UDPPort, pu.EndpointNetwork, err) + } } } } @@ -355,7 +363,9 @@ func (r *linuxRouter) onIPRuleDeleted(table uint8, priority uint32) { } func (r *linuxRouter) Up() error { - if err := r.setNetfilterMode(netfilterOff); err != nil { + r.mu.Lock() + defer r.mu.Unlock() + if err := r.setNetfilterModeLocked(netfilterOff); err != nil { return fmt.Errorf("setting netfilter mode: %w", err) } if err := r.addIPRules(); err != nil { @@ -369,6 +379,8 @@ func (r *linuxRouter) Up() error { } func (r *linuxRouter) Close() error { + r.mu.Lock() + defer r.mu.Unlock() r.closed.Store(true) if r.unregNetMon != nil { r.unregNetMon() @@ -380,7 +392,7 @@ func (r *linuxRouter) Close() error { if err := r.delIPRules(); err != nil { return err } - if err := r.setNetfilterMode(netfilterOff); err != nil { + if err := r.setNetfilterModeLocked(netfilterOff); err != nil { return err } if err := r.delRoutes(); err != nil { @@ -394,10 +406,10 @@ func (r *linuxRouter) Close() error { return nil } -// setupNetfilter initializes the NetfilterRunner in r.nfr. It expects r.nfr +// setupNetfilterLocked initializes the NetfilterRunner in r.nfr. It expects r.nfr // to be nil, or the current netfilter to be set to netfilterOff. // kind should be either a linuxfw.FirewallMode, or the empty string for auto. -func (r *linuxRouter) setupNetfilter(kind string) error { +func (r *linuxRouter) setupNetfilterLocked(kind string) error { r.netfilterKind = kind var err error @@ -411,24 +423,26 @@ func (r *linuxRouter) setupNetfilter(kind string) error { // Set implements the Router interface. func (r *linuxRouter) Set(cfg *router.Config) error { + r.mu.Lock() + defer r.mu.Unlock() var errs []error if cfg == nil { cfg = &shutdownConfig } if cfg.NetfilterKind != r.netfilterKind { - if err := r.setNetfilterMode(netfilterOff); err != nil { + if err := r.setNetfilterModeLocked(netfilterOff); err != nil { err = fmt.Errorf("could not disable existing netfilter: %w", err) errs = append(errs, err) } else { r.nfr = nil - if err := r.setupNetfilter(cfg.NetfilterKind); err != nil { + if err := r.setupNetfilterLocked(cfg.NetfilterKind); err != nil { errs = append(errs, err) } } } - if err := r.setNetfilterMode(cfg.NetfilterMode); err != nil { + if err := r.setNetfilterModeLocked(cfg.NetfilterMode); err != nil { errs = append(errs, err) } @@ -470,11 +484,11 @@ func (r *linuxRouter) Set(cfg *router.Config) error { case cfg.StatefulFiltering == r.statefulFiltering: // state already correct, nothing to do. case cfg.StatefulFiltering: - if err := r.addStatefulRule(); err != nil { + if err := r.addStatefulRuleLocked(); err != nil { errs = append(errs, err) } default: - if err := r.delStatefulRule(); err != nil { + if err := r.delStatefulRuleLocked(); err != nil { errs = append(errs, err) } } @@ -538,15 +552,17 @@ func (r *linuxRouter) updateStatefulFilteringWithDockerWarning(cfg *router.Confi r.health.SetHealthy(dockerStatefulFilteringWarnable) } -// UpdateMagicsockPort implements the Router interface. -func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error { +// updateMagicsockPort implements the Router interface. +func (r *linuxRouter) updateMagicsockPort(port uint16, network string) error { + r.mu.Lock() + defer r.mu.Unlock() if r.nfr == nil { - if err := r.setupNetfilter(r.netfilterKind); err != nil { + if err := r.setupNetfilterLocked(r.netfilterKind); err != nil { return fmt.Errorf("could not setup netfilter: %w", err) } } - var magicsockPort *atomic.Uint32 + var magicsockPort *uint16 switch network { case "udp4": magicsockPort = &r.magicsockPortV4 @@ -566,45 +582,41 @@ func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error { // set the port, we'll make the firewall rule when netfilter turns back on if r.netfilterMode == netfilterOff { - magicsockPort.Store(uint32(port)) + *magicsockPort = port return nil } - cur := magicsockPort.Load() - - if cur == uint32(port) { + if *magicsockPort == port { return nil } - if cur != 0 { - if err := r.nfr.DelMagicsockPortRule(uint16(cur), network); err != nil { + if *magicsockPort != 0 { + if err := r.nfr.DelMagicsockPortRule(*magicsockPort, network); err != nil { return fmt.Errorf("del magicsock port rule: %w", err) } } if port != 0 { - if err := r.nfr.AddMagicsockPortRule(uint16(port), network); err != nil { + if err := r.nfr.AddMagicsockPortRule(*magicsockPort, network); err != nil { return fmt.Errorf("add magicsock port rule: %w", err) } } - magicsockPort.Store(uint32(port)) + *magicsockPort = port return nil } -// setNetfilterMode switches the router to the given netfilter +// setNetfilterModeLocked switches the router to the given netfilter // mode. Netfilter state is created or deleted appropriately to // reflect the new mode, and r.snatSubnetRoutes is updated to reflect // the current state of subnet SNATing. -func (r *linuxRouter) setNetfilterMode(mode preftype.NetfilterMode) error { +func (r *linuxRouter) setNetfilterModeLocked(mode preftype.NetfilterMode) error { if !platformCanNetfilter() { mode = netfilterOff } if r.nfr == nil { - var err error - r.nfr, err = linuxfw.New(r.logf, r.netfilterKind) - if err != nil { + if err := r.setupNetfilterLocked(r.netfilterKind); err != nil { return err } } @@ -660,13 +672,13 @@ func (r *linuxRouter) setNetfilterMode(mode preftype.NetfilterMode) error { if err := r.nfr.AddBase(r.tunname); err != nil { return err } - if mport := uint16(r.magicsockPortV4.Load()); mport != 0 { - if err := r.nfr.AddMagicsockPortRule(mport, "udp4"); err != nil { + if r.magicsockPortV4 != 0 { + if err := r.nfr.AddMagicsockPortRule(r.magicsockPortV4, "udp4"); err != nil { return fmt.Errorf("could not add magicsock port rule v4: %w", err) } } - if mport := uint16(r.magicsockPortV6.Load()); mport != 0 && r.getV6FilteringAvailable() { - if err := r.nfr.AddMagicsockPortRule(mport, "udp6"); err != nil { + if r.magicsockPortV6 != 0 && r.getV6FilteringAvailable() { + if err := r.nfr.AddMagicsockPortRule(r.magicsockPortV6, "udp6"); err != nil { return fmt.Errorf("could not add magicsock port rule v6: %w", err) } } @@ -700,13 +712,13 @@ func (r *linuxRouter) setNetfilterMode(mode preftype.NetfilterMode) error { if err := r.nfr.AddBase(r.tunname); err != nil { return err } - if mport := uint16(r.magicsockPortV4.Load()); mport != 0 { - if err := r.nfr.AddMagicsockPortRule(mport, "udp4"); err != nil { + if r.magicsockPortV4 != 0 { + if err := r.nfr.AddMagicsockPortRule(r.magicsockPortV4, "udp4"); err != nil { return fmt.Errorf("could not add magicsock port rule v4: %w", err) } } - if mport := uint16(r.magicsockPortV6.Load()); mport != 0 && r.getV6FilteringAvailable() { - if err := r.nfr.AddMagicsockPortRule(mport, "udp6"); err != nil { + if r.magicsockPortV6 != 0 && r.getV6FilteringAvailable() { + if err := r.nfr.AddMagicsockPortRule(r.magicsockPortV6, "udp6"); err != nil { return fmt.Errorf("could not add magicsock port rule v6: %w", err) } } @@ -1483,9 +1495,9 @@ func (r *linuxRouter) delSNATRule() error { return nil } -// addStatefulRule adds a netfilter rule to perform stateful filtering from +// addStatefulRuleLocked adds a netfilter rule to perform stateful filtering from // subnets onto the tailnet. -func (r *linuxRouter) addStatefulRule() error { +func (r *linuxRouter) addStatefulRuleLocked() error { if r.netfilterMode == netfilterOff { return nil } @@ -1493,9 +1505,9 @@ func (r *linuxRouter) addStatefulRule() error { return r.nfr.AddStatefulRule(r.tunname) } -// delStatefulRule removes the netfilter rule to perform stateful filtering +// delStatefulRuleLocked removes the netfilter rule to perform stateful filtering // from subnets onto the tailnet. -func (r *linuxRouter) delStatefulRule() error { +func (r *linuxRouter) delStatefulRuleLocked() error { if r.netfilterMode == netfilterOff { return nil } diff --git a/wgengine/router/osrouter/router_openbsd.go b/wgengine/router/osrouter/router_openbsd.go index 8f3599309..55b485f0e 100644 --- a/wgengine/router/osrouter/router_openbsd.go +++ b/wgengine/router/osrouter/router_openbsd.go @@ -238,13 +238,6 @@ func (r *openbsdRouter) Set(cfg *router.Config) error { return errq } -// UpdateMagicsockPort implements the Router interface. This implementation -// does nothing and returns nil because this router does not currently need -// to know what the magicsock UDP port is. -func (r *openbsdRouter) UpdateMagicsockPort(_ uint16, _ string) error { - return nil -} - func (r *openbsdRouter) Close() error { cleanUp(r.logf, r.tunname) return nil diff --git a/wgengine/router/osrouter/router_plan9.go b/wgengine/router/osrouter/router_plan9.go index 5872aa7fc..a5b461a6f 100644 --- a/wgengine/router/osrouter/router_plan9.go +++ b/wgengine/router/osrouter/router_plan9.go @@ -115,13 +115,6 @@ func (r *plan9Router) Set(cfg *router.Config) error { return nil } -// UpdateMagicsockPort implements the Router interface. This implementation -// does nothing and returns nil because this router does not currently need -// to know what the magicsock UDP port is. -func (r *plan9Router) UpdateMagicsockPort(_ uint16, _ string) error { - return nil -} - func (r *plan9Router) Close() error { // TODO(bradfitz): unbind return nil diff --git a/wgengine/router/osrouter/router_userspace_bsd.go b/wgengine/router/osrouter/router_userspace_bsd.go index cdaf3adea..70ef2b6bf 100644 --- a/wgengine/router/osrouter/router_userspace_bsd.go +++ b/wgengine/router/osrouter/router_userspace_bsd.go @@ -206,13 +206,6 @@ func (r *userspaceBSDRouter) Set(cfg *router.Config) (reterr error) { return reterr } -// UpdateMagicsockPort implements the Router interface. This implementation -// does nothing and returns nil because this router does not currently need -// to know what the magicsock UDP port is. -func (r *userspaceBSDRouter) UpdateMagicsockPort(_ uint16, _ string) error { - return nil -} - func (r *userspaceBSDRouter) Close() error { return nil } diff --git a/wgengine/router/osrouter/router_windows.go b/wgengine/router/osrouter/router_windows.go index 05bf210e8..a1acbe3b6 100644 --- a/wgengine/router/osrouter/router_windows.go +++ b/wgengine/router/osrouter/router_windows.go @@ -114,13 +114,6 @@ func hasDefaultRoute(routes []netip.Prefix) bool { return false } -// UpdateMagicsockPort implements the Router interface. This implementation -// does nothing and returns nil because this router does not currently need -// to know what the magicsock UDP port is. -func (r *winRouter) UpdateMagicsockPort(_ uint16, _ string) error { - return nil -} - func (r *winRouter) Close() error { r.firewall.clear() diff --git a/wgengine/router/router.go b/wgengine/router/router.go index df65e697d..04cc89887 100644 --- a/wgengine/router/router.go +++ b/wgengine/router/router.go @@ -35,14 +35,6 @@ type Router interface { // implementation should handle gracefully. Set(*Config) error - // UpdateMagicsockPort tells the OS network stack what port magicsock - // is currently listening on, so it can be threaded through firewalls - // and such. This is distinct from Set() since magicsock may rebind - // ports independently from the Config changing. - // - // network should be either "udp4" or "udp6". - UpdateMagicsockPort(port uint16, network string) error - // Close closes the router. Close() error } @@ -56,6 +48,14 @@ type NewOpts struct { Bus *eventbus.Bus // required } +// PortUpdate is an eventbus value, reporting the port and address family +// magicsock is currently listening on, so it can be threaded through firewalls +// and such. +type PortUpdate struct { + UDPPort uint16 + EndpointNetwork string // either "udp4" or "udp6". +} + // HookNewUserspaceRouter is the registration point for router implementations // to register a constructor for userspace routers. It's meant for implementations // in wgengine/router/osrouter. diff --git a/wgengine/router/router_fake.go b/wgengine/router/router_fake.go index 549867eca..db35fc9ee 100644 --- a/wgengine/router/router_fake.go +++ b/wgengine/router/router_fake.go @@ -27,11 +27,6 @@ func (r fakeRouter) Set(cfg *Config) error { return nil } -func (r fakeRouter) UpdateMagicsockPort(_ uint16, _ string) error { - r.logf("[v1] warning: fakeRouter.UpdateMagicsockPort: not implemented.") - return nil -} - func (r fakeRouter) Close() error { r.logf("[v1] warning: fakeRouter.Close: not implemented.") return nil diff --git a/wgengine/userspace.go b/wgengine/userspace.go index e971f0e39..b8a136da7 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -398,13 +398,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) e.RequestStatus() } - onPortUpdate := func(port uint16, network string) { - e.logf("onPortUpdate(port=%v, network=%s)", port, network) - - if err := e.router.UpdateMagicsockPort(port, network); err != nil { - e.logf("UpdateMagicsockPort(port=%v, network=%s) failed: %v", port, network, err) - } - } magicsockOpts := magicsock.Options{ EventBus: e.eventBus, Logf: logf, @@ -416,7 +409,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) HealthTracker: e.health, Metrics: conf.Metrics, ControlKnobs: conf.ControlKnobs, - OnPortUpdate: onPortUpdate, PeerByKeyFunc: e.PeerByKey, } if buildfeatures.HasLazyWG {