util/eventbus: add a Done channel to the Client (#17118)

Subscribers already have a Done channel that the caller can use to detect when
the subscriber has been closed. Typically this happens when the governing
Client closes, which in turn is typically because the Bus closed.

But clients and subscribers can stop at other times too, and a caller has no
good way to tell the difference between "this subscriber closed but the rest
are OK" and "the client closed and all these subscribers are finished".

We've worked around this in practice by knowing the closure of one subscriber
implies the fate of the rest, but we can do better: Add a Done method to the
Client that allows us to tell when that has been closed explicitly, after all
the publishers and subscribers associated with that client have been closed.
This allows the caller to be sure that, by the time that occurs, no further
pending events are forthcoming on that client.

Updates #15160

Change-Id: Id601a79ba043365ecdb47dd035f1fdadd984f303
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-09-16 07:44:08 -07:00 committed by GitHub
parent 5ad3bd9f47
commit 5b5ae2b2ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 4 deletions

View File

@ -27,7 +27,16 @@ func TestBus(t *testing.T) {
defer b.Close() defer b.Close()
c := b.Client("TestSub") c := b.Client("TestSub")
defer c.Close() cdone := c.Done()
defer func() {
c.Close()
select {
case <-cdone:
t.Log("Client close signal received (OK)")
case <-time.After(time.Second):
t.Error("timed out waiting for client close signal")
}
}()
s := eventbus.Subscribe[EventA](c) s := eventbus.Subscribe[EventA](c)
go func() { go func() {
@ -178,6 +187,40 @@ func TestSpam(t *testing.T) {
// subsequences of the received slices. // subsequences of the received slices.
} }
func TestClient_Done(t *testing.T) {
b := eventbus.New()
defer b.Close()
c := b.Client(t.Name())
s := eventbus.Subscribe[string](c)
// The client is not Done until closed.
select {
case <-c.Done():
t.Fatal("Client done before being closed")
default:
// OK
}
go c.Close()
// Once closed, the client becomes Done.
select {
case <-c.Done():
// OK
case <-time.After(time.Second):
t.Fatal("timeout waiting for Client to be done")
}
// Thereafter, the subscriber should also be closed.
select {
case <-s.Done():
// OK
case <-time.After(time.Second):
t.Fatal("timoeout waiting for Subscriber to be done")
}
}
type queueChecker struct { type queueChecker struct {
t *testing.T t *testing.T
want []any want []any

View File

@ -21,9 +21,10 @@ type Client struct {
bus *Bus bus *Bus
publishDebug hook[PublishedEvent] publishDebug hook[PublishedEvent]
mu sync.Mutex mu sync.Mutex
pub set.Set[publisher] pub set.Set[publisher]
sub *subscribeState // Lazily created on first subscribe sub *subscribeState // Lazily created on first subscribe
stop stopFlag // signaled on Close
} }
func (c *Client) Name() string { return c.name } func (c *Client) Name() string { return c.name }
@ -47,8 +48,14 @@ func (c *Client) Close() {
for p := range pub { for p := range pub {
p.Close() p.Close()
} }
c.stop.Stop()
} }
// Done returns a channel that is closed when [Client.Close] is called.
// The channel is closed after all the publishers and subscribers governed by
// the client have been closed.
func (c *Client) Done() <-chan struct{} { return c.stop.Done() }
func (c *Client) snapshotSubscribeQueue() []DeliveredEvent { func (c *Client) snapshotSubscribeQueue() []DeliveredEvent {
return c.peekSubscribeState().snapshotQueue() return c.peekSubscribeState().snapshotQueue()
} }