diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index bbc129c5e..3de1cb81e 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -123,9 +123,7 @@ type Auto struct { observerQueue execqueue.ExecQueue shutdownFn func() // to be called prior to shutdown or nil - eventClient *eventbus.Client - healthChangeSub *eventbus.Subscriber[health.Change] - subsDoneCh chan struct{} // close-only channel when eventClient has closed + eventSubs eventbus.Monitor mu sync.Mutex // mutex guards the following fields @@ -195,11 +193,11 @@ func NewNoStart(opts Options) (_ *Auto, err error) { updateDone: make(chan struct{}), observer: opts.Observer, shutdownFn: opts.Shutdown, - subsDoneCh: make(chan struct{}), } - c.eventClient = opts.Bus.Client("controlClient.Auto") - c.healthChangeSub = eventbus.Subscribe[health.Change](c.eventClient) + // Set up eventbus client and subscriber + cli := opts.Bus.Client("controlClient.Auto") + c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) c.authCtx, c.authCancel = context.WithCancel(context.Background()) c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf) @@ -207,7 +205,6 @@ func NewNoStart(opts Options) (_ *Auto, err error) { c.mapCtx, c.mapCancel = context.WithCancel(context.Background()) c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf) - go c.consumeEventbusTopics() return c, nil } @@ -216,16 +213,17 @@ func NewNoStart(opts Options) (_ *Auto, err error) { // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the // [eventbus.Client] is closed. -func (c *Auto) consumeEventbusTopics() { - defer close(c.subsDoneCh) - - for { - select { - case <-c.eventClient.Done(): - return - case change := <-c.healthChangeSub.Events(): - if change.WarnableChanged { - c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState) +func (c *Auto) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { + healthChangeSub := eventbus.Subscribe[health.Change](cli) + return func(cli *eventbus.Client) { + for { + select { + case <-cli.Done(): + return + case change := <-healthChangeSub.Events(): + if change.WarnableChanged { + c.direct.ReportWarnableChange(change.Warnable, change.UnhealthyState) + } } } } @@ -784,8 +782,7 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) { } func (c *Auto) Shutdown() { - c.eventClient.Close() - <-c.subsDoneCh + c.eventSubs.Close() c.mu.Lock() if c.closed {