From ca9d79500615082dc46fffc4b1d93ad66fa6b8eb Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 19 Sep 2025 12:34:06 -0700 Subject: [PATCH] util/eventbus: add a Monitor type to manage subscriber goroutines (#17127) A common pattern in event bus usage is to run a goroutine to service a collection of subscribers on a single bus client. To have an orderly shutdown, however, we need a way to wait for such a goroutine to be finished. This commit adds a Monitor type that makes this pattern easier to wire up: rather than having to track all the subscribers and an extra channel, the component need only track the client and the monitor. For example: cli := bus.Client("example") m := cli.Monitor(func(c *eventbus.Client) { s1 := eventbus.Subscribe[T](cli) s2 := eventbus.Subscribe[U](cli) for { select { case <-c.Done(): return case t := <-s1.Events(): processT(t) case u := <-s2.Events(): processU(u) } } }) To shut down the client and wait for the goroutine, the caller can write: m.Close() which closes cli and waits for the goroutine to finish. Or, separately: cli.Close() // do other stuff m.Wait() While the goroutine management is not explicitly tied to subscriptions, it is a common enough pattern that this seems like a useful simplification in use. Updates #15160 Change-Id: I657afda1cfaf03465a9dce1336e9fd518a968bca Signed-off-by: M. J. Fromberger --- util/eventbus/bus_test.go | 73 +++++++++++++++++++++++++++++++++++++++ util/eventbus/monitor.go | 42 ++++++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 util/eventbus/monitor.go diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 9fd0e4409..7782634ae 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -221,6 +221,79 @@ func TestClient_Done(t *testing.T) { } } +func TestMonitor(t *testing.T) { + t.Run("ZeroWait", func(t *testing.T) { + var zero eventbus.Monitor + + ready := make(chan struct{}) + go func() { zero.Wait(); close(ready) }() + + select { + case <-ready: + // OK + case <-time.After(time.Second): + t.Fatal("timeout waiting for Wait to return") + } + }) + + t.Run("ZeroClose", func(t *testing.T) { + var zero eventbus.Monitor + + ready := make(chan struct{}) + go func() { zero.Close(); close(ready) }() + + select { + case <-ready: + // OK + case <-time.After(time.Second): + t.Fatal("timeout waiting for Close to return") + } + }) + + testMon := func(t *testing.T, release func(*eventbus.Client, eventbus.Monitor)) func(t *testing.T) { + t.Helper() + return func(t *testing.T) { + bus := eventbus.New() + cli := bus.Client("test client") + + // The monitored goroutine runs until the client or test subscription ends. + m := cli.Monitor(func(c *eventbus.Client) { + sub := eventbus.Subscribe[string](cli) + select { + case <-c.Done(): + t.Log("client closed") + case <-sub.Done(): + t.Log("subscription closed") + } + }) + + done := make(chan struct{}) + go func() { + defer close(done) + m.Wait() + }() + + // While the goroutine is running, Wait does not complete. + select { + case <-done: + t.Error("monitor is ready before its goroutine is finished") + default: + // OK + } + + release(cli, m) + select { + case <-done: + // OK + case <-time.After(time.Second): + t.Fatal("timeout waiting for monitor to complete") + } + } + } + t.Run("Close", testMon(t, func(_ *eventbus.Client, m eventbus.Monitor) { m.Close() })) + t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() })) +} + type queueChecker struct { t *testing.T want []any diff --git a/util/eventbus/monitor.go b/util/eventbus/monitor.go new file mode 100644 index 000000000..18cc2a413 --- /dev/null +++ b/util/eventbus/monitor.go @@ -0,0 +1,42 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +// A Monitor monitors the execution of a goroutine processing events from a +// [Client], allowing the caller to block until it is complete. The zero value +// of m is valid and its Close and Wait methods return immediately. +type Monitor struct { + // These fields are immutable after initialization + cli *Client + done <-chan struct{} +} + +// Close closes the client associated with m and blocks until the processing +// goroutine is complete. +func (m Monitor) Close() { + if m.cli == nil { + return + } + m.cli.Close() + <-m.done +} + +// Wait blocks until the goroutine monitored by m has finished executing, but +// does not close the associated client. It is safe to call Wait repeatedly, +// and from multiple concurrent goroutines. +func (m Monitor) Wait() { + if m.done == nil { + return + } + <-m.done +} + +// Monitor executes f in a new goroutine attended by a [Monitor]. The caller +// is responsible for waiting for the goroutine to complete, by calling either +// [Monitor.Close] or [Monitor.Wait]. +func (c *Client) Monitor(f func(*Client)) Monitor { + done := make(chan struct{}) + go func() { defer close(done); f(c) }() + return Monitor{cli: c, done: done} +}