From abb4c7ec18f4131c1709a6ec806cbfb006a5c1ea Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 2 Oct 2025 13:51:02 -0700 Subject: [PATCH] util/eventbus: [DRAFT] add sketch of Subscribe with funcs Updates #DRAFT Change-Id: Id1f208bdd55a9ae4eccc07afc44eade6e67db5bb Signed-off-by: Brad Fitzpatrick --- ipn/ipnlocal/local.go | 60 +++++++++------------------------ util/eventbus/client.go | 20 +++++++++++ util/eventbus/subscribe.go | 33 +++++++++++++++++- wgengine/magicsock/magicsock.go | 60 ++++++++++----------------------- 4 files changed, 85 insertions(+), 88 deletions(-) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 7488a06a9..3aa625ecc 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -548,7 +548,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo b.prevIfState = netMon.InterfaceState() // Call our linkChange code once with the current state. // Following changes are triggered via the eventbus. - b.linkChange(&netmon.ChangeDelta{New: netMon.InterfaceState()}) + b.linkChange(netmon.ChangeDelta{New: netMon.InterfaceState()}) if buildfeatures.HasPeerAPIServer { if tunWrap, ok := b.sys.Tun.GetOK(); ok { @@ -573,50 +573,17 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo // Start the event bus late, once all the assignments above are done. // (See previous race in tailscale/tailscale#17252) ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") - b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec)) + eventbus.SubscribeFunc(ec, b.onClientVersion) + eventbus.SubscribeFunc(ec, b.onTailnetDefaultAutoUpdateEvent) + eventbus.SubscribeFunc(ec, b.onHealthChange) + eventbus.SubscribeFunc(ec, b.linkChange) + if buildfeatures.HasPortList { + eventbus.SubscribeFunc(ec, b.setPortlistServices) + } return b, nil } -// consumeEventbusTopics consumes events from all relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) { - clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec) - autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec) - healthChangeSub := eventbus.Subscribe[health.Change](ec) - changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec) - - var portlist <-chan PortlistServices - if buildfeatures.HasPortList { - portlistSub := eventbus.Subscribe[PortlistServices](ec) - portlist = portlistSub.Events() - } - - return func(ec *eventbus.Client) { - for { - select { - case <-ec.Done(): - return - case clientVersion := <-clientVersionSub.Events(): - b.onClientVersion(&clientVersion) - case au := <-autoUpdateSub.Events(): - b.onTailnetDefaultAutoUpdate(au.Value) - case change := <-healthChangeSub.Events(): - b.onHealthChange(change) - case changeDelta := <-changeDeltaSub.Events(): - b.linkChange(&changeDelta) - case pl := <-portlist: - if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans - b.setPortlistServices(pl) - } - } - } - } -} - func (b *LocalBackend) Clock() tstime.Clock { return b.clock } func (b *LocalBackend) Sys() *tsd.System { return b.sys } @@ -933,7 +900,7 @@ func (b *LocalBackend) DisconnectControl() { } // linkChange is our network monitor callback, called whenever the network changes. -func (b *LocalBackend) linkChange(delta *netmon.ChangeDelta) { +func (b *LocalBackend) linkChange(delta netmon.ChangeDelta) { b.mu.Lock() defer b.mu.Unlock() @@ -3399,7 +3366,8 @@ func (b *LocalBackend) tellRecipientToBrowseToURL(url string, recipient notifica // onClientVersion is called on MapResponse updates when a MapResponse contains // a non-nil ClientVersion message. -func (b *LocalBackend) onClientVersion(v *tailcfg.ClientVersion) { +func (b *LocalBackend) onClientVersion(cv tailcfg.ClientVersion) { + v := &cv b.mu.Lock() b.lastClientVersion = v b.health.SetLatestVersion(v) @@ -3407,6 +3375,10 @@ func (b *LocalBackend) onClientVersion(v *tailcfg.ClientVersion) { b.send(ipn.Notify{ClientVersion: v}) } +func (b *LocalBackend) onTailnetDefaultAutoUpdateEvent(a controlclient.AutoUpdate) { + b.onTailnetDefaultAutoUpdate(a.Value) +} + func (b *LocalBackend) onTailnetDefaultAutoUpdate(au bool) { unlock := b.lockAndGetUnlock() defer unlock() @@ -4708,7 +4680,7 @@ func (b *LocalBackend) peerAPIServicesLocked() (ret []tailcfg.Service) { // to advertise the running services on the host. type PortlistServices []tailcfg.Service -func (b *LocalBackend) setPortlistServices(sl []tailcfg.Service) { +func (b *LocalBackend) setPortlistServices(sl PortlistServices) { if !buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans return } diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 7c0268886..b5333292a 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -147,6 +147,26 @@ func Subscribe[T any](c *Client) *Subscriber[T] { return s } +// SubscribeFunc is like [Subscribe] but calls the provided func +// for each event of type T. +// +// The func is not called from a new goroutine. It is called +// from the eventbus's goroutine. +func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { + c.mu.Lock() + defer c.mu.Unlock() + + if c.isClosed() { + panic("cannot SubscribeFunc on a closed client") + } + + r := c.subscribeStateLocked() + s := newSubscriberFunc(r, f) + r.addSubscriber(s) + + return &SubscriberFunc[T]{s: s} +} + // Publish returns a publisher for event type T using the given client. // It panics if c is closed. func Publish[T any](c *Client) *Publisher[T] { diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index ef155e621..cfa810daa 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -179,10 +179,21 @@ func (s *subscribeState) closed() <-chan struct{} { // A Subscriber delivers one type of event from a [Client]. type Subscriber[T any] struct { stop stopFlag - read chan T + read chan T // mutually exclusive with readFunc + readFunc func(T) // mutually exclusive with read unregister func() } +// SubscriberFunc is like [Subscriber] but has no channel +// for delivery. They're returned by [SubscribeFunc]. +type SubscriberFunc[T any] struct { + s *Subscriber[T] // but don't use its Events or Done methods +} + +func (s *SubscriberFunc[T]) Close() { + s.s.Close() +} + func newSubscriber[T any](r *subscribeState) *Subscriber[T] { return &Subscriber[T]{ read: make(chan T), @@ -190,6 +201,13 @@ func newSubscriber[T any](r *subscribeState) *Subscriber[T] { } } +func newSubscriberFunc[T any](r *subscribeState, f func(T)) *Subscriber[T] { + return &Subscriber[T]{ + readFunc: f, + unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + } +} + func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { ret := &Subscriber[T]{ read: make(chan T, 100), // arbitrary, large @@ -211,6 +229,19 @@ func (s *Subscriber[T]) monitor(debugEvent T) { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { t := vals.Peek().Event.(T) + + if s.readFunc != nil { + select { + case <-ctx.Done(): + return false + case ch := <-snapshot: + ch <- vals.Snapshot() + default: + } + s.readFunc(t) + vals.Drop() + return true + } for { // Keep the cases in this select in sync with subscribeState.pump // above. The only different should be that this select diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index e3cf249c5..db93ec0bb 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -156,7 +156,7 @@ type Conn struct { // struct. Initialized once at construction, then constant. eventBus *eventbus.Bus - eventSubs eventbus.Monitor + eventBusClient *eventbus.Client logf logger.Logf epFunc func([]tailcfg.Endpoint) derpActiveFunc func() @@ -631,43 +631,6 @@ func newConn(logf logger.Logf) *Conn { return c } -// consumeEventbusTopics consumes events from all [Conn]-relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { - // Subscribe calls must return before NewConn otherwise published - // events can be missed. - pmSub := eventbus.Subscribe[portmappertype.Mapping](cli) - filterSub := eventbus.Subscribe[FilterUpdate](cli) - nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli) - nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli) - syncSub := eventbus.Subscribe[syncPoint](cli) - allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli) - return func(cli *eventbus.Client) { - for { - select { - case <-cli.Done(): - return - case <-pmSub.Events(): - c.onPortMapChanged() - case filterUpdate := <-filterSub.Events(): - c.onFilterUpdate(filterUpdate) - case nodeViews := <-nodeViewsSub.Events(): - c.onNodeViewsUpdate(nodeViews) - case nodeMuts := <-nodeMutsSub.Events(): - c.onNodeMutationsUpdate(nodeMuts) - case syncPoint := <-syncSub.Events(): - c.dlogf("magicsock: received sync point after reconfig") - syncPoint.Signal() - case allocResp := <-allocRelayEndpointSub.Events(): - c.onUDPRelayAllocResp(allocResp) - } - } - } -} - func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) { c.mu.Lock() defer c.mu.Unlock() @@ -732,10 +695,10 @@ func NewConn(opts Options) (*Conn, error) { // Set up publishers and subscribers. Subscribe calls must return before // NewConn otherwise published events can be missed. - cli := c.eventBus.Client("magicsock.Conn") - c.syncPub = eventbus.Publish[syncPoint](cli) - c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) - c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) + ec := c.eventBus.Client("magicsock.Conn") + c.eventBusClient = ec + c.syncPub = eventbus.Publish[syncPoint](ec) + c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.donec = c.connCtx.Done() @@ -791,6 +754,17 @@ func NewConn(opts Options) (*Conn, error) { } c.logf("magicsock: disco key = %v", c.discoShort) + + eventbus.SubscribeFunc(ec, func(portmappertype.Mapping) { c.onPortMapChanged() }) + eventbus.SubscribeFunc(ec, c.onFilterUpdate) + eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate) + eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate) + eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp) + eventbus.SubscribeFunc(ec, func(p syncPoint) { + c.dlogf("magicsock: received sync point after reconfig") + p.Signal() + }) + return c, nil } @@ -3317,7 +3291,7 @@ func (c *Conn) Close() error { // deadlock with c.Close(). // 2. Conn.consumeEventbusTopics event handlers may not guard against // undesirable post/in-progress Conn.Close() behaviors. - c.eventSubs.Close() + c.eventBusClient.Close() c.mu.Lock() defer c.mu.Unlock()