mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-16 14:52:18 +01:00
feature,ipn/ipnlocal,wgengine: improve how eventbus shutdown is handled (#17156)
Instead of waiting for a designated subscription to close as a canary for the bus being stopped, use the bus Client's own signal for closure added in #17118. Updates #cleanup Change-Id: I384ea39f3f1f6a030a6282356f7b5bdcdf8d7102 Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
parent
b63f5d7e7d
commit
8608e42103
@ -157,9 +157,7 @@ func (e *extension) consumeEventbusTopics(port int) {
|
|||||||
select {
|
select {
|
||||||
case <-e.disconnectFromBusCh:
|
case <-e.disconnectFromBusCh:
|
||||||
return
|
return
|
||||||
case <-reqSub.Done():
|
case <-eventClient.Done():
|
||||||
// If reqSub is done, the eventClient has been closed, which is a
|
|
||||||
// signal to return.
|
|
||||||
return
|
return
|
||||||
case req := <-reqSub.Events():
|
case req := <-reqSub.Events():
|
||||||
if rs == nil {
|
if rs == nil {
|
||||||
|
|||||||
@ -68,15 +68,13 @@ func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager {
|
|||||||
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
||||||
// always handled in the order they are received, i.e. the next event is not
|
// always handled in the order they are received, i.e. the next event is not
|
||||||
// read until the previous event's handler has returned. It returns when the
|
// read until the previous event's handler has returned. It returns when the
|
||||||
// [controlclient.ControlTime] subscriber is closed, which is interpreted to be the
|
// [eventbus.Client] is closed.
|
||||||
// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either
|
|
||||||
// all open or all closed).
|
|
||||||
func (em *expiryManager) consumeEventbusTopics() {
|
func (em *expiryManager) consumeEventbusTopics() {
|
||||||
defer close(em.subsDoneCh)
|
defer close(em.subsDoneCh)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-em.controlTimeSub.Done():
|
case <-em.eventClient.Done():
|
||||||
return
|
return
|
||||||
case time := <-em.controlTimeSub.Events():
|
case time := <-em.controlTimeSub.Events():
|
||||||
em.onControlTime(time.Value)
|
em.onControlTime(time.Value)
|
||||||
|
|||||||
@ -619,18 +619,13 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
|
|||||||
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
||||||
// always handled in the order they are received, i.e. the next event is not
|
// always handled in the order they are received, i.e. the next event is not
|
||||||
// read until the previous event's handler has returned. It returns when the
|
// read until the previous event's handler has returned. It returns when the
|
||||||
// [tailcfg.ClientVersion] subscriber is closed, which is interpreted to be the
|
// [eventbus.Client] is closed.
|
||||||
// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either
|
|
||||||
// all open or all closed).
|
|
||||||
func (b *LocalBackend) consumeEventbusTopics() {
|
func (b *LocalBackend) consumeEventbusTopics() {
|
||||||
defer close(b.subsDoneCh)
|
defer close(b.subsDoneCh)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// TODO(cmol): Move to using b.eventClient.Done() once implemented.
|
case <-b.eventClient.Done():
|
||||||
// In the meantime, we rely on the subs not going away until the client is
|
|
||||||
// closed, closing all its subscribers.
|
|
||||||
case <-b.clientVersionSub.Done():
|
|
||||||
return
|
return
|
||||||
case clientVersion := <-b.clientVersionSub.Events():
|
case clientVersion := <-b.clientVersionSub.Events():
|
||||||
b.onClientVersion(&clientVersion)
|
b.onClientVersion(&clientVersion)
|
||||||
|
|||||||
@ -640,15 +640,13 @@ func newConn(logf logger.Logf) *Conn {
|
|||||||
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
||||||
// always handled in the order they are received, i.e. the next event is not
|
// always handled in the order they are received, i.e. the next event is not
|
||||||
// read until the previous event's handler has returned. It returns when the
|
// read until the previous event's handler has returned. It returns when the
|
||||||
// [portmapper.Mapping] subscriber is closed, which is interpreted to be the
|
// [eventbus.Client] is closed.
|
||||||
// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either
|
|
||||||
// all open or all closed).
|
|
||||||
func (c *Conn) consumeEventbusTopics() {
|
func (c *Conn) consumeEventbusTopics() {
|
||||||
defer close(c.subsDoneCh)
|
defer close(c.subsDoneCh)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.pmSub.Done():
|
case <-c.eventClient.Done():
|
||||||
return
|
return
|
||||||
case <-c.pmSub.Events():
|
case <-c.pmSub.Events():
|
||||||
c.onPortMapChanged()
|
c.onPortMapChanged()
|
||||||
|
|||||||
@ -158,13 +158,11 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon
|
|||||||
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
|
||||||
// always handled in the order they are received, i.e. the next event is not
|
// always handled in the order they are received, i.e. the next event is not
|
||||||
// read until the previous event's handler has returned. It returns when the
|
// read until the previous event's handler has returned. It returns when the
|
||||||
// [portmapper.Mapping] subscriber is closed, which is interpreted to be the
|
// [eventbus.Client] is closed.
|
||||||
// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either
|
|
||||||
// all open or all closed).
|
|
||||||
func (r *linuxRouter) consumeEventbusTopics() {
|
func (r *linuxRouter) consumeEventbusTopics() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-r.ruleDeletedSub.Done():
|
case <-r.eventClient.Done():
|
||||||
return
|
return
|
||||||
case rulesDeleted := <-r.ruleDeletedSub.Events():
|
case rulesDeleted := <-r.ruleDeletedSub.Events():
|
||||||
r.onIPRuleDeleted(rulesDeleted.Table, rulesDeleted.Priority)
|
r.onIPRuleDeleted(rulesDeleted.Table, rulesDeleted.Priority)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user