From df362d0a0899e57b7e11e5de397b3688e850847b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20Lensb=C3=B8l?= Date: Wed, 17 Sep 2025 10:49:41 -0400 Subject: [PATCH] net/netmon: make ChangeDelta event not a pointer (#17112) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes things work slightly better over the eventbus. Also switches ipnlocal to use the event over the eventbus instead of the direct callback. Updates #15160 Signed-off-by: Claus Lensbøl --- ipn/ipnlocal/local.go | 11 +++--- net/netmon/netmon.go | 12 +++---- net/netmon/netmon_test.go | 2 +- wgengine/userspace.go | 76 +++++++++++++++++++++++++-------------- 4 files changed, 60 insertions(+), 41 deletions(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index a712dc98a..017349165 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -207,6 +207,7 @@ type LocalBackend struct { clientVersionSub *eventbus.Subscriber[tailcfg.ClientVersion] autoUpdateSub *eventbus.Subscriber[controlclient.AutoUpdate] healthChangeSub *eventbus.Subscriber[health.Change] + changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] subsDoneCh chan struct{} // closed when consumeEventbusTopics returns health *health.Tracker // always non-nil polc policyclient.Client // always non-nil @@ -216,7 +217,6 @@ type LocalBackend struct { dialer *tsdial.Dialer // non-nil; TODO(bradfitz): remove; use sys pushDeviceToken syncs.AtomicValue[string] backendLogID logid.PublicID - unregisterNetMon func() unregisterSysPolicyWatch func() portpoll *portlist.Poller // may be nil portpollOnce sync.Once // guards starting readPoller @@ -544,6 +544,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo b.clientVersionSub = eventbus.Subscribe[tailcfg.ClientVersion](b.eventClient) b.autoUpdateSub = eventbus.Subscribe[controlclient.AutoUpdate](b.eventClient) b.healthChangeSub = eventbus.Subscribe[health.Change](b.eventClient) + b.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](b.eventClient) nb := newNodeBackend(ctx, b.sys.Bus.Get()) b.currentNodeAtomic.Store(nb) nb.ready() @@ -591,10 +592,9 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo b.e.SetStatusCallback(b.setWgengineStatus) b.prevIfState = netMon.InterfaceState() - // Call our linkChange code once with the current state, and - // then also whenever it changes: + // Call our linkChange code once with the current state. + // Following changes are triggered via the eventbus. b.linkChange(&netmon.ChangeDelta{New: netMon.InterfaceState()}) - b.unregisterNetMon = netMon.RegisterChangeCallback(b.linkChange) if tunWrap, ok := b.sys.Tun.GetOK(); ok { tunWrap.PeerAPIPort = b.GetPeerAPIPort @@ -633,6 +633,8 @@ func (b *LocalBackend) consumeEventbusTopics() { b.onTailnetDefaultAutoUpdate(au.Value) case change := <-b.healthChangeSub.Events(): b.onHealthChange(change) + case changeDelta := <-b.changeDeltaSub.Events(): + b.linkChange(&changeDelta) } } } @@ -1160,7 +1162,6 @@ func (b *LocalBackend) Shutdown() { } b.stopOfflineAutoUpdate() - b.unregisterNetMon() b.unregisterSysPolicyWatch() if cc != nil { cc.Shutdown() diff --git a/net/netmon/netmon.go b/net/netmon/netmon.go index b97b184d4..fcac9c4ee 100644 --- a/net/netmon/netmon.go +++ b/net/netmon/netmon.go @@ -53,7 +53,7 @@ type osMon interface { type Monitor struct { logf logger.Logf b *eventbus.Client - changed *eventbus.Publisher[*ChangeDelta] + changed *eventbus.Publisher[ChangeDelta] om osMon // nil means not supported on this platform change chan bool // send false to wake poller, true to also force ChangeDeltas be sent @@ -84,9 +84,6 @@ type ChangeFunc func(*ChangeDelta) // ChangeDelta describes the difference between two network states. type ChangeDelta struct { - // Monitor is the network monitor that sent this delta. - Monitor *Monitor - // Old is the old interface state, if known. // It's nil if the old state is unknown. // Do not mutate it. @@ -126,7 +123,7 @@ func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) { stop: make(chan struct{}), lastWall: wallTime(), } - m.changed = eventbus.Publish[*ChangeDelta](m.b) + m.changed = eventbus.Publish[ChangeDelta](m.b) st, err := m.interfaceStateUncached() if err != nil { return nil, err @@ -401,8 +398,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) { return } - delta := &ChangeDelta{ - Monitor: m, + delta := ChangeDelta{ Old: oldState, New: newState, TimeJumped: timeJumped, @@ -437,7 +433,7 @@ func (m *Monitor) handlePotentialChange(newState *State, forceCallbacks bool) { } m.changed.Publish(delta) for _, cb := range m.cbs { - go cb(delta) + go cb(&delta) } } diff --git a/net/netmon/netmon_test.go b/net/netmon/netmon_test.go index b8ec1b75f..5fcdcc6cc 100644 --- a/net/netmon/netmon_test.go +++ b/net/netmon/netmon_test.go @@ -81,7 +81,7 @@ func TestMonitorInjectEventOnBus(t *testing.T) { mon.Start() mon.InjectEvent() - if err := eventbustest.Expect(tw, eventbustest.Type[*ChangeDelta]()); err != nil { + if err := eventbustest.Expect(tw, eventbustest.Type[ChangeDelta]()); err != nil { t.Error(err) } } diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 4a9f32143..42c12c008 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -93,26 +93,28 @@ const networkLoggerUploadTimeout = 5 * time.Second type userspaceEngine struct { // eventBus will eventually become required, but for now may be nil. // TODO(creachadair): Enforce that this is non-nil at construction. - eventBus *eventbus.Bus + eventBus *eventbus.Bus + eventClient *eventbus.Client + changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] + subsDoneCh chan struct{} // closed when consumeEventbusTopics returns - logf logger.Logf - wgLogger *wglog.Logger // a wireguard-go logging wrapper - reqCh chan struct{} - waitCh chan struct{} // chan is closed when first Close call completes; contrast with closing bool - timeNow func() mono.Time - tundev *tstun.Wrapper - wgdev *device.Device - router router.Router - dialer *tsdial.Dialer - confListenPort uint16 // original conf.ListenPort - dns *dns.Manager - magicConn *magicsock.Conn - netMon *netmon.Monitor - health *health.Tracker - netMonOwned bool // whether we created netMon (and thus need to close it) - netMonUnregister func() // unsubscribes from changes; used regardless of netMonOwned - birdClient BIRDClient // or nil - controlKnobs *controlknobs.Knobs // or nil + logf logger.Logf + wgLogger *wglog.Logger // a wireguard-go logging wrapper + reqCh chan struct{} + waitCh chan struct{} // chan is closed when first Close call completes; contrast with closing bool + timeNow func() mono.Time + tundev *tstun.Wrapper + wgdev *device.Device + router router.Router + dialer *tsdial.Dialer + confListenPort uint16 // original conf.ListenPort + dns *dns.Manager + magicConn *magicsock.Conn + netMon *netmon.Monitor + health *health.Tracker + netMonOwned bool // whether we created netMon (and thus need to close it) + birdClient BIRDClient // or nil + controlKnobs *controlknobs.Knobs // or nil testMaybeReconfigHook func() // for tests; if non-nil, fires if maybeReconfigWireguardLocked called @@ -352,7 +354,11 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) controlKnobs: conf.ControlKnobs, reconfigureVPN: conf.ReconfigureVPN, health: conf.HealthTracker, + subsDoneCh: make(chan struct{}), } + e.eventClient = e.eventBus.Client("userspaceEngine") + e.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](e.eventClient) + closePool.addFunc(e.eventClient.Close) if e.birdClient != nil { // Disable the protocol at start time. @@ -385,13 +391,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) logf("link state: %+v", e.netMon.InterfaceState()) - unregisterMonWatch := e.netMon.RegisterChangeCallback(func(delta *netmon.ChangeDelta) { - tshttpproxy.InvalidateCache() - e.linkChange(delta) - }) - closePool.addFunc(unregisterMonWatch) - e.netMonUnregister = unregisterMonWatch - endpointsFn := func(endpoints []tailcfg.Endpoint) { e.mu.Lock() e.endpoints = append(e.endpoints[:0], endpoints...) @@ -546,10 +545,31 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } } + go e.consumeEventbusTopics() + e.logf("Engine created.") return e, nil } +// consumeEventbusTopics consumes events from all relevant +// [eventbus.Subscriber]'s and passes them to their related handler. Events are +// always handled in the order they are received, i.e. the next event is not +// read until the previous event's handler has returned. It returns when the +// [eventbus.Client] is closed. +func (e *userspaceEngine) consumeEventbusTopics() { + defer close(e.subsDoneCh) + + for { + select { + case <-e.eventClient.Done(): + return + case changeDelta := <-e.changeDeltaSub.Events(): + tshttpproxy.InvalidateCache() + e.linkChange(&changeDelta) + } + } +} + // echoRespondToAll is an inbound post-filter responding to all echo requests. func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) { if p.IsEchoRequest() { @@ -1208,6 +1228,9 @@ func (e *userspaceEngine) RequestStatus() { } func (e *userspaceEngine) Close() { + e.eventClient.Close() + <-e.subsDoneCh + e.mu.Lock() if e.closing { e.mu.Unlock() @@ -1219,7 +1242,6 @@ func (e *userspaceEngine) Close() { r := bufio.NewReader(strings.NewReader("")) e.wgdev.IpcSetOperation(r) e.magicConn.Close() - e.netMonUnregister() if e.netMonOwned { e.netMon.Close() }