wgengine/magicsock: use eventbus.SubscribeFunc in Conn

Updates #15160
Updates #17487

Change-Id: Ic9eb8d82b21d9dc38cb3c681b87101dfbc95af16
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-10-07 17:03:39 -07:00 committed by M. J. Fromberger
parent 5833730577
commit 241ea1c98b

View File

@ -156,7 +156,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
eventSubs eventbus.Monitor
eventClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@ -625,43 +625,6 @@ func newConn(logf logger.Logf) *Conn {
return c
}
// consumeEventbusTopics consumes events from all [Conn]-relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
// Subscribe calls must return before NewConn otherwise published
// events can be missed.
pmSub := eventbus.Subscribe[portmappertype.Mapping](cli)
filterSub := eventbus.Subscribe[FilterUpdate](cli)
nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli)
nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli)
syncSub := eventbus.Subscribe[syncPoint](cli)
allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli)
return func(cli *eventbus.Client) {
for {
select {
case <-cli.Done():
return
case <-pmSub.Events():
c.onPortMapChanged()
case filterUpdate := <-filterSub.Events():
c.onFilterUpdate(filterUpdate)
case nodeViews := <-nodeViewsSub.Events():
c.onNodeViewsUpdate(nodeViews)
case nodeMuts := <-nodeMutsSub.Events():
c.onNodeMutationsUpdate(nodeMuts)
case syncPoint := <-syncSub.Events():
c.dlogf("magicsock: received sync point after reconfig")
syncPoint.Signal()
case allocResp := <-allocRelayEndpointSub.Events():
c.onUDPRelayAllocResp(allocResp)
}
}
}
}
func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
c.mu.Lock()
defer c.mu.Unlock()
@ -726,11 +689,20 @@ func NewConn(opts Options) (*Conn, error) {
// Set up publishers and subscribers. Subscribe calls must return before
// NewConn otherwise published events can be missed.
cli := c.eventBus.Client("magicsock.Conn")
c.syncPub = eventbus.Publish[syncPoint](cli)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli)
c.portUpdatePub = eventbus.Publish[router.PortUpdate](cli)
c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
ec := c.eventBus.Client("magicsock.Conn")
c.eventClient = ec
c.syncPub = eventbus.Publish[syncPoint](ec)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec)
eventbus.SubscribeFunc(ec, c.onPortMapChanged)
eventbus.SubscribeFunc(ec, c.onFilterUpdate)
eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate)
eventbus.SubscribeFunc(ec, func(sp syncPoint) {
c.dlogf("magicsock: received sync point after reconfig")
sp.Signal()
})
eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp)
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
c.donec = c.connCtx.Done()
@ -3307,13 +3279,12 @@ func (c *connBind) isClosed() bool {
//
// Only the first close does anything. Any later closes return nil.
func (c *Conn) Close() error {
// Close the [eventbus.Client] and wait for c.consumeEventbusTopics to
// Close the [eventbus.Client] to wait for subscribers to
// return before acquiring c.mu:
// 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can
// deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors.
c.eventSubs.Close()
// 1. Event handlers also acquire c.mu, they can deadlock with c.Close().
// 2. Event handlers may not guard against undesirable post/in-progress
// Conn.Close() behaviors.
c.eventClient.Close()
c.mu.Lock()
defer c.mu.Unlock()
@ -3410,7 +3381,7 @@ func (c *Conn) shouldDoPeriodicReSTUNLocked() bool {
return true
}
func (c *Conn) onPortMapChanged() { c.ReSTUN("portmap-changed") }
func (c *Conn) onPortMapChanged(portmappertype.Mapping) { c.ReSTUN("portmap-changed") }
// ReSTUN triggers an address discovery.
// The provided why string is for debug logging only.