From 241ea1c98bdfc6e28497340aa57ff46b7604ed68 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 7 Oct 2025 17:03:39 -0700 Subject: [PATCH] wgengine/magicsock: use eventbus.SubscribeFunc in Conn Updates #15160 Updates #17487 Change-Id: Ic9eb8d82b21d9dc38cb3c681b87101dfbc95af16 Signed-off-by: M. J. Fromberger --- wgengine/magicsock/magicsock.go | 71 ++++++++++----------------------- 1 file changed, 21 insertions(+), 50 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index c7d07c277..492dff2ce 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -156,7 +156,7 @@ type Conn struct { // struct. Initialized once at construction, then constant. eventBus *eventbus.Bus - eventSubs eventbus.Monitor + eventClient *eventbus.Client logf logger.Logf epFunc func([]tailcfg.Endpoint) derpActiveFunc func() @@ -625,43 +625,6 @@ func newConn(logf logger.Logf) *Conn { return c } -// consumeEventbusTopics consumes events from all [Conn]-relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { - // Subscribe calls must return before NewConn otherwise published - // events can be missed. - pmSub := eventbus.Subscribe[portmappertype.Mapping](cli) - filterSub := eventbus.Subscribe[FilterUpdate](cli) - nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli) - nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli) - syncSub := eventbus.Subscribe[syncPoint](cli) - allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli) - return func(cli *eventbus.Client) { - for { - select { - case <-cli.Done(): - return - case <-pmSub.Events(): - c.onPortMapChanged() - case filterUpdate := <-filterSub.Events(): - c.onFilterUpdate(filterUpdate) - case nodeViews := <-nodeViewsSub.Events(): - c.onNodeViewsUpdate(nodeViews) - case nodeMuts := <-nodeMutsSub.Events(): - c.onNodeMutationsUpdate(nodeMuts) - case syncPoint := <-syncSub.Events(): - c.dlogf("magicsock: received sync point after reconfig") - syncPoint.Signal() - case allocResp := <-allocRelayEndpointSub.Events(): - c.onUDPRelayAllocResp(allocResp) - } - } - } -} - func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) { c.mu.Lock() defer c.mu.Unlock() @@ -726,11 +689,20 @@ func NewConn(opts Options) (*Conn, error) { // Set up publishers and subscribers. Subscribe calls must return before // NewConn otherwise published events can be missed. - cli := c.eventBus.Client("magicsock.Conn") - c.syncPub = eventbus.Publish[syncPoint](cli) - c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) - c.portUpdatePub = eventbus.Publish[router.PortUpdate](cli) - c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) + ec := c.eventBus.Client("magicsock.Conn") + c.eventClient = ec + c.syncPub = eventbus.Publish[syncPoint](ec) + c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec) + c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec) + eventbus.SubscribeFunc(ec, c.onPortMapChanged) + eventbus.SubscribeFunc(ec, c.onFilterUpdate) + eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate) + eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate) + eventbus.SubscribeFunc(ec, func(sp syncPoint) { + c.dlogf("magicsock: received sync point after reconfig") + sp.Signal() + }) + eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.donec = c.connCtx.Done() @@ -3307,13 +3279,12 @@ func (c *connBind) isClosed() bool { // // Only the first close does anything. Any later closes return nil. func (c *Conn) Close() error { - // Close the [eventbus.Client] and wait for c.consumeEventbusTopics to + // Close the [eventbus.Client] to wait for subscribers to // return before acquiring c.mu: - // 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can - // deadlock with c.Close(). - // 2. Conn.consumeEventbusTopics event handlers may not guard against - // undesirable post/in-progress Conn.Close() behaviors. - c.eventSubs.Close() + // 1. Event handlers also acquire c.mu, they can deadlock with c.Close(). + // 2. Event handlers may not guard against undesirable post/in-progress + // Conn.Close() behaviors. + c.eventClient.Close() c.mu.Lock() defer c.mu.Unlock() @@ -3410,7 +3381,7 @@ func (c *Conn) shouldDoPeriodicReSTUNLocked() bool { return true } -func (c *Conn) onPortMapChanged() { c.ReSTUN("portmap-changed") } +func (c *Conn) onPortMapChanged(portmappertype.Mapping) { c.ReSTUN("portmap-changed") } // ReSTUN triggers an address discovery. // The provided why string is for debug logging only.