From 3dde233cd3aed75f610b63ea33ab1baa9198c81b Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Mon, 20 Oct 2025 12:22:16 -0700 Subject: [PATCH] ipn/ipnlocal: use eventbus.SubscribeFunc in LocalBackend (#17524) This does not change which subscriptions are made, it only swaps them to use the SubscribeFunc API instead of Subscribe. Updates #15160 Updates #17487 Change-Id: Id56027836c96942206200567a118f8bcf9c07f64 Signed-off-by: M. J. Fromberger --- ipn/ipnlocal/local.go | 119 ++++++++++++++++-------------------------- 1 file changed, 44 insertions(+), 75 deletions(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 36e4ad8a5..ee3059de4 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -180,13 +180,13 @@ var ( // state machine generates events back out to zero or more components. type LocalBackend struct { // Elements that are thread-safe or constant after construction. - ctx context.Context // canceled by [LocalBackend.Shutdown] - ctxCancel context.CancelCauseFunc // cancels ctx - logf logger.Logf // general logging - keyLogf logger.Logf // for printing list of peers on change - statsLogf logger.Logf // for printing peers stats on change - sys *tsd.System - eventSubs eventbus.Monitor + ctx context.Context // canceled by [LocalBackend.Shutdown] + ctxCancel context.CancelCauseFunc // cancels ctx + logf logger.Logf // general logging + keyLogf logger.Logf // for printing list of peers on change + statsLogf logger.Logf // for printing peers stats on change + sys *tsd.System + eventClient *eventbus.Client health *health.Tracker // always non-nil polc policyclient.Client // always non-nil @@ -589,74 +589,44 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo // Start the event bus late, once all the assignments above are done. // (See previous race in tailscale/tailscale#17252) ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") - b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec)) + b.eventClient = ec + eventbus.SubscribeFunc(ec, b.onClientVersion) + eventbus.SubscribeFunc(ec, func(au controlclient.AutoUpdate) { + b.onTailnetDefaultAutoUpdate(au.Value) + }) + eventbus.SubscribeFunc(ec, func(cd netmon.ChangeDelta) { b.linkChange(&cd) }) + if buildfeatures.HasHealth { + eventbus.SubscribeFunc(ec, b.onHealthChange) + } + if buildfeatures.HasPortList { + eventbus.SubscribeFunc(ec, b.setPortlistServices) + } + eventbus.SubscribeFunc(ec, b.onAppConnectorRouteUpdate) + eventbus.SubscribeFunc(ec, b.onAppConnectorStoreRoutes) return b, nil } -// consumeEventbusTopics consumes events from all relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) { - clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec) - autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec) - - var healthChange <-chan health.Change - if buildfeatures.HasHealth { - healthChangeSub := eventbus.Subscribe[health.Change](ec) - healthChange = healthChangeSub.Events() +func (b *LocalBackend) onAppConnectorRouteUpdate(ru appctype.RouteUpdate) { + // TODO(creachadair, 2025-10-02): It is currently possible for updates produced under + // one profile to arrive and be applied after a switch to another profile. + // We need to find a way to ensure that changes to the backend state are applied + // consistently in the presnce of profile changes, which currently may not happen in + // a single atomic step. See: https://github.com/tailscale/tailscale/issues/17414 + if err := b.AdvertiseRoute(ru.Advertise...); err != nil { + b.logf("appc: failed to advertise routes: %v: %v", ru.Advertise, err) } - changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec) - routeUpdateSub := eventbus.Subscribe[appctype.RouteUpdate](ec) - storeRoutesSub := eventbus.Subscribe[appctype.RouteInfo](ec) - - var portlist <-chan PortlistServices - if buildfeatures.HasPortList { - portlistSub := eventbus.Subscribe[PortlistServices](ec) - portlist = portlistSub.Events() + if err := b.UnadvertiseRoute(ru.Unadvertise...); err != nil { + b.logf("appc: failed to unadvertise routes: %v: %v", ru.Unadvertise, err) } +} - return func(ec *eventbus.Client) { - for { - select { - case <-ec.Done(): - return - case clientVersion := <-clientVersionSub.Events(): - b.onClientVersion(&clientVersion) - case au := <-autoUpdateSub.Events(): - b.onTailnetDefaultAutoUpdate(au.Value) - case change := <-healthChange: - b.onHealthChange(change) - case changeDelta := <-changeDeltaSub.Events(): - b.linkChange(&changeDelta) - - case pl := <-portlist: - if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans - b.setPortlistServices(pl) - } - case ru := <-routeUpdateSub.Events(): - // TODO(creachadair, 2025-10-02): It is currently possible for updates produced under - // one profile to arrive and be applied after a switch to another profile. - // We need to find a way to ensure that changes to the backend state are applied - // consistently in the presnce of profile changes, which currently may not happen in - // a single atomic step. See: https://github.com/tailscale/tailscale/issues/17414 - if err := b.AdvertiseRoute(ru.Advertise...); err != nil { - b.logf("appc: failed to advertise routes: %v: %v", ru.Advertise, err) - } - if err := b.UnadvertiseRoute(ru.Unadvertise...); err != nil { - b.logf("appc: failed to unadvertise routes: %v: %v", ru.Unadvertise, err) - } - case ri := <-storeRoutesSub.Events(): - // Whether or not routes should be stored can change over time. - shouldStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load() - if shouldStoreRoutes { - if err := b.storeRouteInfo(ri); err != nil { - b.logf("appc: failed to store route info: %v", err) - } - } - } +func (b *LocalBackend) onAppConnectorStoreRoutes(ri appctype.RouteInfo) { + // Whether or not routes should be stored can change over time. + shouldStoreRoutes := b.ControlKnobs().AppCStoreRoutes.Load() + if shouldStoreRoutes { + if err := b.storeRouteInfo(ri); err != nil { + b.logf("appc: failed to store route info: %v", err) } } } @@ -1107,13 +1077,12 @@ func (b *LocalBackend) ClearCaptureSink() { // Shutdown halts the backend and all its sub-components. The backend // can no longer be used after Shutdown returns. func (b *LocalBackend) Shutdown() { - // Close the [eventbus.Client] and wait for LocalBackend.consumeEventbusTopics - // to return. Do this before acquiring b.mu: - // 1. LocalBackend.consumeEventbusTopics event handlers also acquire b.mu, - // they can deadlock with c.Shutdown(). - // 2. LocalBackend.consumeEventbusTopics event handlers may not guard against - // undesirable post/in-progress LocalBackend.Shutdown() behaviors. - b.eventSubs.Close() + // Close the [eventbus.Client] to wait for subscribers to + // return before acquiring b.mu: + // 1. Event handlers also acquire b.mu, they can deadlock with c.Shutdown(). + // 2. Event handlers may not guard against undesirable post/in-progress + // LocalBackend.Shutdown() behaviors. + b.eventClient.Close() b.em.close()