ipn/ipnlocal: use eventbus.SubscribeFunc in LocalBackend (#17524)

This does not change which subscriptions are made, it only swaps them to use
the SubscribeFunc API instead of Subscribe.

Updates #15160
Updates #17487

Change-Id: Id56027836c96942206200567a118f8bcf9c07f64
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-10-20 12:22:16 -07:00 committed by GitHub
parent bf47d8e72b
commit 3dde233cd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -186,7 +186,7 @@ type LocalBackend struct {
keyLogf logger.Logf // for printing list of peers on change keyLogf logger.Logf // for printing list of peers on change
statsLogf logger.Logf // for printing peers stats on change statsLogf logger.Logf // for printing peers stats on change
sys *tsd.System sys *tsd.System
eventSubs eventbus.Monitor eventClient *eventbus.Client
health *health.Tracker // always non-nil health *health.Tracker // always non-nil
polc policyclient.Client // always non-nil polc policyclient.Client // always non-nil
@ -589,54 +589,25 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
// Start the event bus late, once all the assignments above are done. // Start the event bus late, once all the assignments above are done.
// (See previous race in tailscale/tailscale#17252) // (See previous race in tailscale/tailscale#17252)
ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec)) b.eventClient = ec
eventbus.SubscribeFunc(ec, b.onClientVersion)
eventbus.SubscribeFunc(ec, func(au controlclient.AutoUpdate) {
b.onTailnetDefaultAutoUpdate(au.Value)
})
eventbus.SubscribeFunc(ec, func(cd netmon.ChangeDelta) { b.linkChange(&cd) })
if buildfeatures.HasHealth {
eventbus.SubscribeFunc(ec, b.onHealthChange)
}
if buildfeatures.HasPortList {
eventbus.SubscribeFunc(ec, b.setPortlistServices)
}
eventbus.SubscribeFunc(ec, b.onAppConnectorRouteUpdate)
eventbus.SubscribeFunc(ec, b.onAppConnectorStoreRoutes)
return b, nil return b, nil
} }
// consumeEventbusTopics consumes events from all relevant func (b *LocalBackend) onAppConnectorRouteUpdate(ru appctype.RouteUpdate) {
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) {
clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec)
autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec)
var healthChange <-chan health.Change
if buildfeatures.HasHealth {
healthChangeSub := eventbus.Subscribe[health.Change](ec)
healthChange = healthChangeSub.Events()
}
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
routeUpdateSub := eventbus.Subscribe[appctype.RouteUpdate](ec)
storeRoutesSub := eventbus.Subscribe[appctype.RouteInfo](ec)
var portlist <-chan PortlistServices
if buildfeatures.HasPortList {
portlistSub := eventbus.Subscribe[PortlistServices](ec)
portlist = portlistSub.Events()
}
return func(ec *eventbus.Client) {
for {
select {
case <-ec.Done():
return
case clientVersion := <-clientVersionSub.Events():
b.onClientVersion(&clientVersion)
case au := <-autoUpdateSub.Events():
b.onTailnetDefaultAutoUpdate(au.Value)
case change := <-healthChange:
b.onHealthChange(change)
case changeDelta := <-changeDeltaSub.Events():
b.linkChange(&changeDelta)
case pl := <-portlist:
if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
b.setPortlistServices(pl)
}
case ru := <-routeUpdateSub.Events():
// TODO(creachadair, 2025-10-02): It is currently possible for updates produced under // TODO(creachadair, 2025-10-02): It is currently possible for updates produced under
// one profile to arrive and be applied after a switch to another profile. // one profile to arrive and be applied after a switch to another profile.
// We need to find a way to ensure that changes to the backend state are applied // We need to find a way to ensure that changes to the backend state are applied
@ -648,7 +619,9 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
if err := b.UnadvertiseRoute(ru.Unadvertise...); err != nil { if err := b.UnadvertiseRoute(ru.Unadvertise...); err != nil {
b.logf("appc: failed to unadvertise routes: %v: %v", ru.Unadvertise, err) b.logf("appc: failed to unadvertise routes: %v: %v", ru.Unadvertise, err)
} }
case ri := <-storeRoutesSub.Events(): }
func (b *LocalBackend) onAppConnectorStoreRoutes(ri appctype.RouteInfo) {
// Whether or not routes should be stored can change over time. // Whether or not routes should be stored can change over time.
shouldStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load() shouldStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load()
if shouldStoreRoutes { if shouldStoreRoutes {
@ -657,9 +630,6 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
} }
} }
} }
}
}
}
func (b *LocalBackend) Clock() tstime.Clock { return b.clock } func (b *LocalBackend) Clock() tstime.Clock { return b.clock }
func (b *LocalBackend) Sys() *tsd.System { return b.sys } func (b *LocalBackend) Sys() *tsd.System { return b.sys }
@ -1107,13 +1077,12 @@ func (b *LocalBackend) ClearCaptureSink() {
// Shutdown halts the backend and all its sub-components. The backend // Shutdown halts the backend and all its sub-components. The backend
// can no longer be used after Shutdown returns. // can no longer be used after Shutdown returns.
func (b *LocalBackend) Shutdown() { func (b *LocalBackend) Shutdown() {
// Close the [eventbus.Client] and wait for LocalBackend.consumeEventbusTopics // Close the [eventbus.Client] to wait for subscribers to
// to return. Do this before acquiring b.mu: // return before acquiring b.mu:
// 1. LocalBackend.consumeEventbusTopics event handlers also acquire b.mu, // 1. Event handlers also acquire b.mu, they can deadlock with c.Shutdown().
// they can deadlock with c.Shutdown(). // 2. Event handlers may not guard against undesirable post/in-progress
// 2. LocalBackend.consumeEventbusTopics event handlers may not guard against // LocalBackend.Shutdown() behaviors.
// undesirable post/in-progress LocalBackend.Shutdown() behaviors. b.eventClient.Close()
b.eventSubs.Close()
b.em.close() b.em.close()