diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 7ac8f0ecb..4af0a3aa6 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -197,18 +197,14 @@ var ( // state machine generates events back out to zero or more components. type LocalBackend struct { // Elements that are thread-safe or constant after construction. - ctx context.Context // canceled by [LocalBackend.Shutdown] - ctxCancel context.CancelCauseFunc // cancels ctx - logf logger.Logf // general logging - keyLogf logger.Logf // for printing list of peers on change - statsLogf logger.Logf // for printing peers stats on change - sys *tsd.System - eventClient *eventbus.Client - clientVersionSub *eventbus.Subscriber[tailcfg.ClientVersion] - autoUpdateSub *eventbus.Subscriber[controlclient.AutoUpdate] - healthChangeSub *eventbus.Subscriber[health.Change] - changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] - subsDoneCh chan struct{} // closed when consumeEventbusTopics returns + ctx context.Context // canceled by [LocalBackend.Shutdown] + ctxCancel context.CancelCauseFunc // cancels ctx + logf logger.Logf // general logging + keyLogf logger.Logf // for printing list of peers on change + statsLogf logger.Logf // for printing peers stats on change + sys *tsd.System + eventSubs eventbus.Monitor + health *health.Tracker // always non-nil polc policyclient.Client // always non-nil metrics metrics @@ -538,13 +534,10 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCtx: captiveCtx, captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running needsCaptiveDetection: make(chan bool), - subsDoneCh: make(chan struct{}), } - b.eventClient = b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") - b.clientVersionSub = eventbus.Subscribe[tailcfg.ClientVersion](b.eventClient) - b.autoUpdateSub = eventbus.Subscribe[controlclient.AutoUpdate](b.eventClient) - b.healthChangeSub = eventbus.Subscribe[health.Change](b.eventClient) - b.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](b.eventClient) + ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") + b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec)) + nb := newNodeBackend(ctx, b.sys.Bus.Get()) b.currentNodeAtomic.Store(nb) nb.ready() @@ -611,7 +604,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo } } } - go b.consumeEventbusTopics() return b, nil } @@ -620,21 +612,26 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the // [eventbus.Client] is closed. -func (b *LocalBackend) consumeEventbusTopics() { - defer close(b.subsDoneCh) +func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) { + clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec) + autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec) + healthChangeSub := eventbus.Subscribe[health.Change](ec) + changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec) - for { - select { - case <-b.eventClient.Done(): - return - case clientVersion := <-b.clientVersionSub.Events(): - b.onClientVersion(&clientVersion) - case au := <-b.autoUpdateSub.Events(): - b.onTailnetDefaultAutoUpdate(au.Value) - case change := <-b.healthChangeSub.Events(): - b.onHealthChange(change) - case changeDelta := <-b.changeDeltaSub.Events(): - b.linkChange(&changeDelta) + return func(ec *eventbus.Client) { + for { + select { + case <-ec.Done(): + return + case clientVersion := <-clientVersionSub.Events(): + b.onClientVersion(&clientVersion) + case au := <-autoUpdateSub.Events(): + b.onTailnetDefaultAutoUpdate(au.Value) + case change := <-healthChangeSub.Events(): + b.onHealthChange(change) + case changeDelta := <-changeDeltaSub.Events(): + b.linkChange(&changeDelta) + } } } } @@ -1103,8 +1100,7 @@ func (b *LocalBackend) Shutdown() { // they can deadlock with c.Shutdown(). // 2. LocalBackend.consumeEventbusTopics event handlers may not guard against // undesirable post/in-progress LocalBackend.Shutdown() behaviors. - b.eventClient.Close() - <-b.subsDoneCh + b.eventSubs.Close() b.em.close()