controlclient/auto: switch eventbus to using a monitor

Only changes how the go routine consuming the events starts and stops,
not what it does.

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl 2025-09-19 16:19:40 -04:00
parent ca9d795006
commit ddc5df1f82
No known key found for this signature in database
GPG Key ID: 060429CBEC62B1B4

View File

@ -123,9 +123,7 @@ type Auto struct {
observerQueue execqueue.ExecQueue
shutdownFn func() // to be called prior to shutdown or nil
eventClient *eventbus.Client
healthChangeSub *eventbus.Subscriber[health.Change]
subsDoneCh chan struct{} // close-only channel when eventClient has closed
eventSubs eventbus.Monitor
mu sync.Mutex // mutex guards the following fields
@ -195,11 +193,11 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
updateDone: make(chan struct{}),
observer: opts.Observer,
shutdownFn: opts.Shutdown,
subsDoneCh: make(chan struct{}),
}
c.eventClient = opts.Bus.Client("controlClient.Auto")
c.healthChangeSub = eventbus.Subscribe[health.Change](c.eventClient)
// Set up eventbus client and subscriber
cli := opts.Bus.Client("controlClient.Auto")
c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
c.authCtx, c.authCancel = context.WithCancel(context.Background())
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)
@ -207,7 +205,6 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf)
go c.consumeEventbusTopics()
return c, nil
}
@ -216,20 +213,21 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (c *Auto) consumeEventbusTopics() {
defer close(c.subsDoneCh)
func (c *Auto) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
healthChangeSub := eventbus.Subscribe[health.Change](cli)
return func(cli *eventbus.Client) {
for {
select {
case <-c.eventClient.Done():
case <-cli.Done():
return
case change := <-c.healthChangeSub.Events():
case change := <-healthChangeSub.Events():
if change.WarnableChanged {
c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState)
}
}
}
}
}
// SetPaused controls whether HTTP activity should be paused.
//
@ -784,8 +782,7 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
}
func (c *Auto) Shutdown() {
c.eventClient.Close()
<-c.subsDoneCh
c.eventSubs.Close()
c.mu.Lock()
if c.closed {