diff --git a/ipn/ipnlocal/expiry.go b/ipn/ipnlocal/expiry.go index 849e28610..8ea63d21a 100644 --- a/ipn/ipnlocal/expiry.go +++ b/ipn/ipnlocal/expiry.go @@ -43,7 +43,7 @@ type expiryManager struct { logf logger.Logf clock tstime.Clock - eventSubs eventbus.Monitor + eventClient *eventbus.Client } func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager { @@ -53,30 +53,13 @@ func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager { clock: tstime.StdClock{}, } - cli := bus.Client("ipnlocal.expiryManager") - em.eventSubs = cli.Monitor(em.consumeEventbusTopics(cli)) + em.eventClient = bus.Client("ipnlocal.expiryManager") + eventbus.SubscribeFunc(em.eventClient, func(ct controlclient.ControlTime) { + em.onControlTime(ct.Value) + }) return em } -// consumeEventbusTopics consumes events from all relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (em *expiryManager) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { - controlTimeSub := eventbus.Subscribe[controlclient.ControlTime](cli) - return func(cli *eventbus.Client) { - for { - select { - case <-cli.Done(): - return - case time := <-controlTimeSub.Events(): - em.onControlTime(time.Value) - } - } - } -} - // onControlTime is called whenever we receive a new timestamp from the control // server to store the delta. func (em *expiryManager) onControlTime(t time.Time) { @@ -245,7 +228,7 @@ func (em *expiryManager) nextPeerExpiry(nm *netmap.NetworkMap, localNow time.Tim return nextExpiry } -func (em *expiryManager) close() { em.eventSubs.Close() } +func (em *expiryManager) close() { em.eventClient.Close() } // ControlNow estimates the current time on the control server, calculated as // localNow + the delta between local and control server clocks as recorded