util/eventbus: [DRAFT] add sketch of Subscribe with funcs

Updates #DRAFT

Change-Id: Id1f208bdd55a9ae4eccc07afc44eade6e67db5bb
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2025-10-02 13:51:02 -07:00
parent f42be719de
commit abb4c7ec18
4 changed files with 85 additions and 88 deletions

View File

@ -548,7 +548,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
b.prevIfState = netMon.InterfaceState()
// Call our linkChange code once with the current state.
// Following changes are triggered via the eventbus.
b.linkChange(&netmon.ChangeDelta{New: netMon.InterfaceState()})
b.linkChange(netmon.ChangeDelta{New: netMon.InterfaceState()})
if buildfeatures.HasPeerAPIServer {
if tunWrap, ok := b.sys.Tun.GetOK(); ok {
@ -573,50 +573,17 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
// Start the event bus late, once all the assignments above are done.
// (See previous race in tailscale/tailscale#17252)
ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec))
eventbus.SubscribeFunc(ec, b.onClientVersion)
eventbus.SubscribeFunc(ec, b.onTailnetDefaultAutoUpdateEvent)
eventbus.SubscribeFunc(ec, b.onHealthChange)
eventbus.SubscribeFunc(ec, b.linkChange)
if buildfeatures.HasPortList {
eventbus.SubscribeFunc(ec, b.setPortlistServices)
}
return b, nil
}
// consumeEventbusTopics consumes events from all relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) {
clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec)
autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec)
healthChangeSub := eventbus.Subscribe[health.Change](ec)
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
var portlist <-chan PortlistServices
if buildfeatures.HasPortList {
portlistSub := eventbus.Subscribe[PortlistServices](ec)
portlist = portlistSub.Events()
}
return func(ec *eventbus.Client) {
for {
select {
case <-ec.Done():
return
case clientVersion := <-clientVersionSub.Events():
b.onClientVersion(&clientVersion)
case au := <-autoUpdateSub.Events():
b.onTailnetDefaultAutoUpdate(au.Value)
case change := <-healthChangeSub.Events():
b.onHealthChange(change)
case changeDelta := <-changeDeltaSub.Events():
b.linkChange(&changeDelta)
case pl := <-portlist:
if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
b.setPortlistServices(pl)
}
}
}
}
}
func (b *LocalBackend) Clock() tstime.Clock { return b.clock }
func (b *LocalBackend) Sys() *tsd.System { return b.sys }
@ -933,7 +900,7 @@ func (b *LocalBackend) DisconnectControl() {
}
// linkChange is our network monitor callback, called whenever the network changes.
func (b *LocalBackend) linkChange(delta *netmon.ChangeDelta) {
func (b *LocalBackend) linkChange(delta netmon.ChangeDelta) {
b.mu.Lock()
defer b.mu.Unlock()
@ -3399,7 +3366,8 @@ func (b *LocalBackend) tellRecipientToBrowseToURL(url string, recipient notifica
// onClientVersion is called on MapResponse updates when a MapResponse contains
// a non-nil ClientVersion message.
func (b *LocalBackend) onClientVersion(v *tailcfg.ClientVersion) {
func (b *LocalBackend) onClientVersion(cv tailcfg.ClientVersion) {
v := &cv
b.mu.Lock()
b.lastClientVersion = v
b.health.SetLatestVersion(v)
@ -3407,6 +3375,10 @@ func (b *LocalBackend) onClientVersion(v *tailcfg.ClientVersion) {
b.send(ipn.Notify{ClientVersion: v})
}
func (b *LocalBackend) onTailnetDefaultAutoUpdateEvent(a controlclient.AutoUpdate) {
b.onTailnetDefaultAutoUpdate(a.Value)
}
func (b *LocalBackend) onTailnetDefaultAutoUpdate(au bool) {
unlock := b.lockAndGetUnlock()
defer unlock()
@ -4708,7 +4680,7 @@ func (b *LocalBackend) peerAPIServicesLocked() (ret []tailcfg.Service) {
// to advertise the running services on the host.
type PortlistServices []tailcfg.Service
func (b *LocalBackend) setPortlistServices(sl []tailcfg.Service) {
func (b *LocalBackend) setPortlistServices(sl PortlistServices) {
if !buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
return
}

View File

@ -147,6 +147,26 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
return s
}
// SubscribeFunc is like [Subscribe] but calls the provided func
// for each event of type T.
//
// The func is not called from a new goroutine. It is called
// from the eventbus's goroutine.
func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
c.mu.Lock()
defer c.mu.Unlock()
if c.isClosed() {
panic("cannot SubscribeFunc on a closed client")
}
r := c.subscribeStateLocked()
s := newSubscriberFunc(r, f)
r.addSubscriber(s)
return &SubscriberFunc[T]{s: s}
}
// Publish returns a publisher for event type T using the given client.
// It panics if c is closed.
func Publish[T any](c *Client) *Publisher[T] {

View File

@ -179,10 +179,21 @@ func (s *subscribeState) closed() <-chan struct{} {
// A Subscriber delivers one type of event from a [Client].
type Subscriber[T any] struct {
stop stopFlag
read chan T
read chan T // mutually exclusive with readFunc
readFunc func(T) // mutually exclusive with read
unregister func()
}
// SubscriberFunc is like [Subscriber] but has no channel
// for delivery. They're returned by [SubscribeFunc].
type SubscriberFunc[T any] struct {
s *Subscriber[T] // but don't use its Events or Done methods
}
func (s *SubscriberFunc[T]) Close() {
s.s.Close()
}
func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
return &Subscriber[T]{
read: make(chan T),
@ -190,6 +201,13 @@ func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
}
}
func newSubscriberFunc[T any](r *subscribeState, f func(T)) *Subscriber[T] {
return &Subscriber[T]{
readFunc: f,
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
}
}
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
@ -211,6 +229,19 @@ func (s *Subscriber[T]) monitor(debugEvent T) {
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
t := vals.Peek().Event.(T)
if s.readFunc != nil {
select {
case <-ctx.Done():
return false
case ch := <-snapshot:
ch <- vals.Snapshot()
default:
}
s.readFunc(t)
vals.Drop()
return true
}
for {
// Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select

View File

@ -156,7 +156,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
eventSubs eventbus.Monitor
eventBusClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@ -631,43 +631,6 @@ func newConn(logf logger.Logf) *Conn {
return c
}
// consumeEventbusTopics consumes events from all [Conn]-relevant
// [eventbus.Subscriber]'s and passes them to their related handler. Events are
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
// Subscribe calls must return before NewConn otherwise published
// events can be missed.
pmSub := eventbus.Subscribe[portmappertype.Mapping](cli)
filterSub := eventbus.Subscribe[FilterUpdate](cli)
nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli)
nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli)
syncSub := eventbus.Subscribe[syncPoint](cli)
allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli)
return func(cli *eventbus.Client) {
for {
select {
case <-cli.Done():
return
case <-pmSub.Events():
c.onPortMapChanged()
case filterUpdate := <-filterSub.Events():
c.onFilterUpdate(filterUpdate)
case nodeViews := <-nodeViewsSub.Events():
c.onNodeViewsUpdate(nodeViews)
case nodeMuts := <-nodeMutsSub.Events():
c.onNodeMutationsUpdate(nodeMuts)
case syncPoint := <-syncSub.Events():
c.dlogf("magicsock: received sync point after reconfig")
syncPoint.Signal()
case allocResp := <-allocRelayEndpointSub.Events():
c.onUDPRelayAllocResp(allocResp)
}
}
}
}
func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
c.mu.Lock()
defer c.mu.Unlock()
@ -732,10 +695,10 @@ func NewConn(opts Options) (*Conn, error) {
// Set up publishers and subscribers. Subscribe calls must return before
// NewConn otherwise published events can be missed.
cli := c.eventBus.Client("magicsock.Conn")
c.syncPub = eventbus.Publish[syncPoint](cli)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli)
c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
ec := c.eventBus.Client("magicsock.Conn")
c.eventBusClient = ec
c.syncPub = eventbus.Publish[syncPoint](ec)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
c.donec = c.connCtx.Done()
@ -791,6 +754,17 @@ func NewConn(opts Options) (*Conn, error) {
}
c.logf("magicsock: disco key = %v", c.discoShort)
eventbus.SubscribeFunc(ec, func(portmappertype.Mapping) { c.onPortMapChanged() })
eventbus.SubscribeFunc(ec, c.onFilterUpdate)
eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate)
eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp)
eventbus.SubscribeFunc(ec, func(p syncPoint) {
c.dlogf("magicsock: received sync point after reconfig")
p.Signal()
})
return c, nil
}
@ -3317,7 +3291,7 @@ func (c *Conn) Close() error {
// deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors.
c.eventSubs.Close()
c.eventBusClient.Close()
c.mu.Lock()
defer c.mu.Unlock()