diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index e159b6a12..9fd0e4409 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -27,7 +27,16 @@ func TestBus(t *testing.T) { defer b.Close() c := b.Client("TestSub") - defer c.Close() + cdone := c.Done() + defer func() { + c.Close() + select { + case <-cdone: + t.Log("Client close signal received (OK)") + case <-time.After(time.Second): + t.Error("timed out waiting for client close signal") + } + }() s := eventbus.Subscribe[EventA](c) go func() { @@ -178,6 +187,40 @@ func TestSpam(t *testing.T) { // subsequences of the received slices. } +func TestClient_Done(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client(t.Name()) + s := eventbus.Subscribe[string](c) + + // The client is not Done until closed. + select { + case <-c.Done(): + t.Fatal("Client done before being closed") + default: + // OK + } + + go c.Close() + + // Once closed, the client becomes Done. + select { + case <-c.Done(): + // OK + case <-time.After(time.Second): + t.Fatal("timeout waiting for Client to be done") + } + + // Thereafter, the subscriber should also be closed. + select { + case <-s.Done(): + // OK + case <-time.After(time.Second): + t.Fatal("timoeout waiting for Subscriber to be done") + } +} + type queueChecker struct { t *testing.T want []any diff --git a/util/eventbus/client.go b/util/eventbus/client.go index a6266a4d8..176b6f2bc 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -21,9 +21,10 @@ type Client struct { bus *Bus publishDebug hook[PublishedEvent] - mu sync.Mutex - pub set.Set[publisher] - sub *subscribeState // Lazily created on first subscribe + mu sync.Mutex + pub set.Set[publisher] + sub *subscribeState // Lazily created on first subscribe + stop stopFlag // signaled on Close } func (c *Client) Name() string { return c.name } @@ -47,8 +48,14 @@ func (c *Client) Close() { for p := range pub { p.Close() } + c.stop.Stop() } +// Done returns a channel that is closed when [Client.Close] is called. +// The channel is closed after all the publishers and subscribers governed by +// the client have been closed. +func (c *Client) Done() <-chan struct{} { return c.stop.Done() } + func (c *Client) snapshotSubscribeQueue() []DeliveredEvent { return c.peekSubscribeState().snapshotQueue() }