ipn/ipnlocal: use eventbus.Monitor in expiryManager (#17204)

This commit does not change the order or meaning of any eventbus activity, it
only updates the way the plumbing is set up.

Updates #15160

Change-Id: I0a175e67e867459daaedba0731bf68bd331e5ebc
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-09-19 14:31:55 -07:00 committed by GitHub
parent 2b6bc11586
commit f9c699812a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -43,9 +43,7 @@ type expiryManager struct {
logf logger.Logf logf logger.Logf
clock tstime.Clock clock tstime.Clock
eventClient *eventbus.Client eventSubs eventbus.Monitor
controlTimeSub *eventbus.Subscriber[controlclient.ControlTime]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
} }
func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager { func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager {
@ -55,12 +53,8 @@ func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager {
clock: tstime.StdClock{}, clock: tstime.StdClock{},
} }
em.eventClient = bus.Client("ipnlocal.expiryManager") cli := bus.Client("ipnlocal.expiryManager")
em.controlTimeSub = eventbus.Subscribe[controlclient.ControlTime](em.eventClient) em.eventSubs = cli.Monitor(em.consumeEventbusTopics(cli))
em.subsDoneCh = make(chan struct{})
go em.consumeEventbusTopics()
return em return em
} }
@ -69,17 +63,18 @@ func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager {
// always handled in the order they are received, i.e. the next event is not // always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the // read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed. // [eventbus.Client] is closed.
func (em *expiryManager) consumeEventbusTopics() { func (em *expiryManager) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
defer close(em.subsDoneCh) controlTimeSub := eventbus.Subscribe[controlclient.ControlTime](cli)
return func(cli *eventbus.Client) {
for { for {
select { select {
case <-em.eventClient.Done(): case <-cli.Done():
return return
case time := <-em.controlTimeSub.Events(): case time := <-controlTimeSub.Events():
em.onControlTime(time.Value) em.onControlTime(time.Value)
} }
} }
}
} }
// onControlTime is called whenever we receive a new timestamp from the control // onControlTime is called whenever we receive a new timestamp from the control
@ -250,10 +245,7 @@ func (em *expiryManager) nextPeerExpiry(nm *netmap.NetworkMap, localNow time.Tim
return nextExpiry return nextExpiry
} }
func (em *expiryManager) close() { func (em *expiryManager) close() { em.eventSubs.Close() }
em.eventClient.Close()
<-em.subsDoneCh
}
// ControlNow estimates the current time on the control server, calculated as // ControlNow estimates the current time on the control server, calculated as
// localNow + the delta between local and control server clocks as recorded // localNow + the delta between local and control server clocks as recorded