ipn/ipnlocal: use eventbus.Monitor in LocalBackend (#17225)

This commit does not change the order or meaning of any eventbus activity, it
only updates the way the plumbing is set up.

Updates #15160

Change-Id: I06860ac4e43952a9bb4d85366138c9d9a17fd9cd
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-09-22 08:43:39 -07:00 committed by GitHub
parent e59fbaab64
commit 1b5201023f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -197,18 +197,14 @@ var (
// state machine generates events back out to zero or more components. // state machine generates events back out to zero or more components.
type LocalBackend struct { type LocalBackend struct {
// Elements that are thread-safe or constant after construction. // Elements that are thread-safe or constant after construction.
ctx context.Context // canceled by [LocalBackend.Shutdown] ctx context.Context // canceled by [LocalBackend.Shutdown]
ctxCancel context.CancelCauseFunc // cancels ctx ctxCancel context.CancelCauseFunc // cancels ctx
logf logger.Logf // general logging logf logger.Logf // general logging
keyLogf logger.Logf // for printing list of peers on change keyLogf logger.Logf // for printing list of peers on change
statsLogf logger.Logf // for printing peers stats on change statsLogf logger.Logf // for printing peers stats on change
sys *tsd.System sys *tsd.System
eventClient *eventbus.Client eventSubs eventbus.Monitor
clientVersionSub *eventbus.Subscriber[tailcfg.ClientVersion]
autoUpdateSub *eventbus.Subscriber[controlclient.AutoUpdate]
healthChangeSub *eventbus.Subscriber[health.Change]
changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
health *health.Tracker // always non-nil health *health.Tracker // always non-nil
polc policyclient.Client // always non-nil polc policyclient.Client // always non-nil
metrics metrics metrics metrics
@ -538,13 +534,10 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCtx: captiveCtx, captiveCtx: captiveCtx,
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool), needsCaptiveDetection: make(chan bool),
subsDoneCh: make(chan struct{}),
} }
b.eventClient = b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
b.clientVersionSub = eventbus.Subscribe[tailcfg.ClientVersion](b.eventClient) b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec))
b.autoUpdateSub = eventbus.Subscribe[controlclient.AutoUpdate](b.eventClient)
b.healthChangeSub = eventbus.Subscribe[health.Change](b.eventClient)
b.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](b.eventClient)
nb := newNodeBackend(ctx, b.sys.Bus.Get()) nb := newNodeBackend(ctx, b.sys.Bus.Get())
b.currentNodeAtomic.Store(nb) b.currentNodeAtomic.Store(nb)
nb.ready() nb.ready()
@ -611,7 +604,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
} }
} }
} }
go b.consumeEventbusTopics()
return b, nil return b, nil
} }
@ -620,21 +612,26 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
// always handled in the order they are received, i.e. the next event is not // always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the // read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed. // [eventbus.Client] is closed.
func (b *LocalBackend) consumeEventbusTopics() { func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) {
defer close(b.subsDoneCh) clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec)
autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec)
healthChangeSub := eventbus.Subscribe[health.Change](ec)
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
for { return func(ec *eventbus.Client) {
select { for {
case <-b.eventClient.Done(): select {
return case <-ec.Done():
case clientVersion := <-b.clientVersionSub.Events(): return
b.onClientVersion(&clientVersion) case clientVersion := <-clientVersionSub.Events():
case au := <-b.autoUpdateSub.Events(): b.onClientVersion(&clientVersion)
b.onTailnetDefaultAutoUpdate(au.Value) case au := <-autoUpdateSub.Events():
case change := <-b.healthChangeSub.Events(): b.onTailnetDefaultAutoUpdate(au.Value)
b.onHealthChange(change) case change := <-healthChangeSub.Events():
case changeDelta := <-b.changeDeltaSub.Events(): b.onHealthChange(change)
b.linkChange(&changeDelta) case changeDelta := <-changeDeltaSub.Events():
b.linkChange(&changeDelta)
}
} }
} }
} }
@ -1103,8 +1100,7 @@ func (b *LocalBackend) Shutdown() {
// they can deadlock with c.Shutdown(). // they can deadlock with c.Shutdown().
// 2. LocalBackend.consumeEventbusTopics event handlers may not guard against // 2. LocalBackend.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress LocalBackend.Shutdown() behaviors. // undesirable post/in-progress LocalBackend.Shutdown() behaviors.
b.eventClient.Close() b.eventSubs.Close()
<-b.subsDoneCh
b.em.close() b.em.close()