From 5b5ae2b2eea44f30ea4afe78f2176d1b3fcd4809 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 16 Sep 2025 07:44:08 -0700 Subject: [PATCH] util/eventbus: add a Done channel to the Client (#17118) Subscribers already have a Done channel that the caller can use to detect when the subscriber has been closed. Typically this happens when the governing Client closes, which in turn is typically because the Bus closed. But clients and subscribers can stop at other times too, and a caller has no good way to tell the difference between "this subscriber closed but the rest are OK" and "the client closed and all these subscribers are finished". We've worked around this in practice by knowing the closure of one subscriber implies the fate of the rest, but we can do better: Add a Done method to the Client that allows us to tell when that has been closed explicitly, after all the publishers and subscribers associated with that client have been closed. This allows the caller to be sure that, by the time that occurs, no further pending events are forthcoming on that client. Updates #15160 Change-Id: Id601a79ba043365ecdb47dd035f1fdadd984f303 Signed-off-by: M. J. Fromberger --- util/eventbus/bus_test.go | 45 ++++++++++++++++++++++++++++++++++++++- util/eventbus/client.go | 13 ++++++++--- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index e159b6a12..9fd0e4409 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -27,7 +27,16 @@ func TestBus(t *testing.T) { defer b.Close() c := b.Client("TestSub") - defer c.Close() + cdone := c.Done() + defer func() { + c.Close() + select { + case <-cdone: + t.Log("Client close signal received (OK)") + case <-time.After(time.Second): + t.Error("timed out waiting for client close signal") + } + }() s := eventbus.Subscribe[EventA](c) go func() { @@ -178,6 +187,40 @@ func TestSpam(t *testing.T) { // subsequences of the received slices. } +func TestClient_Done(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client(t.Name()) + s := eventbus.Subscribe[string](c) + + // The client is not Done until closed. + select { + case <-c.Done(): + t.Fatal("Client done before being closed") + default: + // OK + } + + go c.Close() + + // Once closed, the client becomes Done. + select { + case <-c.Done(): + // OK + case <-time.After(time.Second): + t.Fatal("timeout waiting for Client to be done") + } + + // Thereafter, the subscriber should also be closed. + select { + case <-s.Done(): + // OK + case <-time.After(time.Second): + t.Fatal("timoeout waiting for Subscriber to be done") + } +} + type queueChecker struct { t *testing.T want []any diff --git a/util/eventbus/client.go b/util/eventbus/client.go index a6266a4d8..176b6f2bc 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -21,9 +21,10 @@ type Client struct { bus *Bus publishDebug hook[PublishedEvent] - mu sync.Mutex - pub set.Set[publisher] - sub *subscribeState // Lazily created on first subscribe + mu sync.Mutex + pub set.Set[publisher] + sub *subscribeState // Lazily created on first subscribe + stop stopFlag // signaled on Close } func (c *Client) Name() string { return c.name } @@ -47,8 +48,14 @@ func (c *Client) Close() { for p := range pub { p.Close() } + c.stop.Stop() } +// Done returns a channel that is closed when [Client.Close] is called. +// The channel is closed after all the publishers and subscribers governed by +// the client have been closed. +func (c *Client) Done() <-chan struct{} { return c.stop.Done() } + func (c *Client) snapshotSubscribeQueue() []DeliveredEvent { return c.peekSubscribeState().snapshotQueue() }