6 Commits

Author SHA1 Message Date
M. J. Fromberger
df747f1c1b
util/eventbus: add a Done method to the Monitor type (#17263)
Some systems need to tell whether the monitored goroutine has finished
alongside other channel operations (notably in this case the relay server, but
there seem likely to be others similarly situated).

Updates #15160

Change-Id: I5f0f3fae827b07f9b7102a3b08f60cda9737fe28
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-09-24 09:14:41 -07:00
M. J. Fromberger
e59fbaab64
util/eventbus: give a nicer error when attempting to use a closed client (#17208)
It is a programming error to Publish or Subscribe on a closed Client, but now
the way you discover that is by getting a panic from down in the machinery of
the bus after the client state has been cleaned up.

To provide a more helpful error, let's panic explicitly when that happens and
say what went wrong ("the client is closed"), by preventing subscriptions from
interleaving with closure of the client. With this change, either an attachment
fails outright (because the client is already closed) or completes and then
shuts down in good order in the normal course.

This does not change the semantics of the client, publishers, or subscribers,
it's just making the failure more eager so we can attach explanatory text.

Updates #15160

Change-Id: Ia492f4c1dea7535aec2cdcc2e5ea5410ed5218d2
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-09-22 07:07:57 -07:00
M. J. Fromberger
ca9d795006
util/eventbus: add a Monitor type to manage subscriber goroutines (#17127)
A common pattern in event bus usage is to run a goroutine to service a
collection of subscribers on a single bus client. To have an orderly shutdown,
however, we need a way to wait for such a goroutine to be finished.

This commit adds a Monitor type that makes this pattern easier to wire up:
rather than having to track all the subscribers and an extra channel, the
component need only track the client and the monitor.  For example:

   cli := bus.Client("example")
   m := cli.Monitor(func(c *eventbus.Client) {
     s1 := eventbus.Subscribe[T](cli)
     s2 := eventbus.Subscribe[U](cli)
     for {
       select {
       case <-c.Done():
         return
       case t := <-s1.Events():
          processT(t)
       case u := <-s2.Events():
          processU(u)
       }
     }
   })

To shut down the client and wait for the goroutine, the caller can write:

   m.Close()

which closes cli and waits for the goroutine to finish. Or, separately:

   cli.Close()
   // do other stuff
   m.Wait()

While the goroutine management is not explicitly tied to subscriptions, it is a
common enough pattern that this seems like a useful simplification in use.

Updates #15160

Change-Id: I657afda1cfaf03465a9dce1336e9fd518a968bca
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-09-19 12:34:06 -07:00
M. J. Fromberger
5b5ae2b2ee
util/eventbus: add a Done channel to the Client (#17118)
Subscribers already have a Done channel that the caller can use to detect when
the subscriber has been closed. Typically this happens when the governing
Client closes, which in turn is typically because the Bus closed.

But clients and subscribers can stop at other times too, and a caller has no
good way to tell the difference between "this subscriber closed but the rest
are OK" and "the client closed and all these subscribers are finished".

We've worked around this in practice by knowing the closure of one subscriber
implies the fate of the rest, but we can do better: Add a Done method to the
Client that allows us to tell when that has been closed explicitly, after all
the publishers and subscribers associated with that client have been closed.
This allows the caller to be sure that, by the time that occurs, no further
pending events are forthcoming on that client.

Updates #15160

Change-Id: Id601a79ba043365ecdb47dd035f1fdadd984f303
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-09-16 07:44:08 -07:00
David Anderson
3e18434595 util/eventbus: rework to have a Client abstraction
The Client carries both publishers and subscribers for a single
actor. This makes the APIs for publish and subscribe look more
similar, and this structure is a better fit for upcoming debug
facilities.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
2025-03-04 17:38:20 -08:00
David Anderson
ef906763ee util/eventbus: initial implementation of an in-process event bus
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
Co-authored-by: M. J. Fromberger <fromberger@tailscale.com>
2025-02-28 13:45:43 -08:00