From 2a3d67e9b78a7f8d9a2f20ebcc8658f409fe4d1a Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 7 Oct 2025 16:50:34 -0700 Subject: [PATCH] wgengine: use eventbus.SubscribeFunc in userspaceEngine Updates #15160 Updates #17487 Change-Id: Id852098c4f9c2fdeab9151b0b8c14dceff73b99d Signed-off-by: M. J. Fromberger --- wgengine/userspace.go | 39 +++++++++++---------------------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/wgengine/userspace.go b/wgengine/userspace.go index b8a136da7..fa2379288 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -94,9 +94,8 @@ const networkLoggerUploadTimeout = 5 * time.Second type userspaceEngine struct { // eventBus will eventually become required, but for now may be nil. - // TODO(creachadair): Enforce that this is non-nil at construction. - eventBus *eventbus.Bus - eventSubs eventbus.Monitor + eventBus *eventbus.Bus + eventClient *eventbus.Client logf logger.Logf wgLogger *wglog.Logger // a wireguard-go logging wrapper @@ -539,34 +538,18 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } } - cli := e.eventBus.Client("userspaceEngine") - e.eventSubs = cli.Monitor(e.consumeEventbusTopics(cli)) + ec := e.eventBus.Client("userspaceEngine") + eventbus.SubscribeFunc(ec, func(cd netmon.ChangeDelta) { + if f, ok := feature.HookProxyInvalidateCache.GetOk(); ok { + f() + } + e.linkChange(&cd) + }) + e.eventClient = ec e.logf("Engine created.") return e, nil } -// consumeEventbusTopics consumes events from all relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { - changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli) - return func(cli *eventbus.Client) { - for { - select { - case <-cli.Done(): - return - case changeDelta := <-changeDeltaSub.Events(): - if f, ok := feature.HookProxyInvalidateCache.GetOk(); ok { - f() - } - e.linkChange(&changeDelta) - } - } - } -} - // echoRespondToAll is an inbound post-filter responding to all echo requests. func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) { if p.IsEchoRequest() { @@ -1257,7 +1240,7 @@ func (e *userspaceEngine) RequestStatus() { } func (e *userspaceEngine) Close() { - e.eventSubs.Close() + e.eventClient.Close() e.mu.Lock() if e.closing { e.mu.Unlock()