wgengine: use eventbus.SubscribeFunc in userspaceEngine

Updates #15160
Updates #17487

Change-Id: Id852098c4f9c2fdeab9151b0b8c14dceff73b99d
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-10-07 16:50:34 -07:00 committed by M. J. Fromberger
parent 2d1014ead1
commit 2a3d67e9b7

View File

@ -94,9 +94,8 @@ const networkLoggerUploadTimeout = 5 * time.Second
type userspaceEngine struct {
// eventBus will eventually become required, but for now may be nil.
// TODO(creachadair): Enforce that this is non-nil at construction.
eventBus *eventbus.Bus
eventSubs eventbus.Monitor
eventBus *eventbus.Bus
eventClient *eventbus.Client
logf logger.Logf
wgLogger *wglog.Logger // a wireguard-go logging wrapper
@ -539,34 +538,18 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
}
}
cli := e.eventBus.Client("userspaceEngine")
e.eventSubs = cli.Monitor(e.consumeEventbusTopics(cli))
ec := e.eventBus.Client("userspaceEngine")
eventbus.SubscribeFunc(ec, func(cd netmon.ChangeDelta) {
if f, ok := feature.HookProxyInvalidateCache.GetOk(); ok {
f()
}
e.linkChange(&cd)
})
e.eventClient = ec
e.logf("Engine created.")
return e, nil
}
// consumeEventbusTopics consumes events from all relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli)
return func(cli *eventbus.Client) {
for {
select {
case <-cli.Done():
return
case changeDelta := <-changeDeltaSub.Events():
if f, ok := feature.HookProxyInvalidateCache.GetOk(); ok {
f()
}
e.linkChange(&changeDelta)
}
}
}
}
// echoRespondToAll is an inbound post-filter responding to all echo requests.
func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) {
if p.IsEchoRequest() {
@ -1257,7 +1240,7 @@ func (e *userspaceEngine) RequestStatus() {
}
func (e *userspaceEngine) Close() {
e.eventSubs.Close()
e.eventClient.Close()
e.mu.Lock()
if e.closing {
e.mu.Unlock()