tailscale/util/eventbus/bus_test.go
M. J. Fromberger ca9d795006
util/eventbus: add a Monitor type to manage subscriber goroutines (#17127)
A common pattern in event bus usage is to run a goroutine to service a
collection of subscribers on a single bus client. To have an orderly shutdown,
however, we need a way to wait for such a goroutine to be finished.

This commit adds a Monitor type that makes this pattern easier to wire up:
rather than having to track all the subscribers and an extra channel, the
component need only track the client and the monitor.  For example:

   cli := bus.Client("example")
   m := cli.Monitor(func(c *eventbus.Client) {
     s1 := eventbus.Subscribe[T](cli)
     s2 := eventbus.Subscribe[U](cli)
     for {
       select {
       case <-c.Done():
         return
       case t := <-s1.Events():
          processT(t)
       case u := <-s2.Events():
          processU(u)
       }
     }
   })

To shut down the client and wait for the goroutine, the caller can write:

   m.Close()

which closes cli and waits for the goroutine to finish. Or, separately:

   cli.Close()
   // do other stuff
   m.Wait()

While the goroutine management is not explicitly tied to subscriptions, it is a
common enough pattern that this seems like a useful simplification in use.

Updates #15160

Change-Id: I657afda1cfaf03465a9dce1336e9fd518a968bca
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2025-09-19 12:34:06 -07:00

320 lines
6.9 KiB
Go

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus_test
import (
"errors"
"fmt"
"testing"
"time"
"github.com/creachadair/taskgroup"
"github.com/google/go-cmp/cmp"
"tailscale.com/util/eventbus"
)
type EventA struct {
Counter int
}
type EventB struct {
Counter int
}
func TestBus(t *testing.T) {
b := eventbus.New()
defer b.Close()
c := b.Client("TestSub")
cdone := c.Done()
defer func() {
c.Close()
select {
case <-cdone:
t.Log("Client close signal received (OK)")
case <-time.After(time.Second):
t.Error("timed out waiting for client close signal")
}
}()
s := eventbus.Subscribe[EventA](c)
go func() {
p := b.Client("TestPub")
defer p.Close()
pa := eventbus.Publish[EventA](p)
defer pa.Close()
pb := eventbus.Publish[EventB](p)
defer pb.Close()
pa.Publish(EventA{1})
pb.Publish(EventB{2})
pa.Publish(EventA{3})
}()
want := expectEvents(t, EventA{1}, EventA{3})
for !want.Empty() {
select {
case got := <-s.Events():
want.Got(got)
case <-s.Done():
t.Fatalf("queue closed unexpectedly")
case <-time.After(time.Second):
t.Fatalf("timed out waiting for event")
}
}
}
func TestBusMultipleConsumers(t *testing.T) {
b := eventbus.New()
defer b.Close()
c1 := b.Client("TestSubA")
defer c1.Close()
s1 := eventbus.Subscribe[EventA](c1)
c2 := b.Client("TestSubB")
defer c2.Close()
s2A := eventbus.Subscribe[EventA](c2)
s2B := eventbus.Subscribe[EventB](c2)
go func() {
p := b.Client("TestPub")
defer p.Close()
pa := eventbus.Publish[EventA](p)
defer pa.Close()
pb := eventbus.Publish[EventB](p)
defer pb.Close()
pa.Publish(EventA{1})
pb.Publish(EventB{2})
pa.Publish(EventA{3})
}()
wantA := expectEvents(t, EventA{1}, EventA{3})
wantB := expectEvents(t, EventA{1}, EventB{2}, EventA{3})
for !wantA.Empty() || !wantB.Empty() {
select {
case got := <-s1.Events():
wantA.Got(got)
case got := <-s2A.Events():
wantB.Got(got)
case got := <-s2B.Events():
wantB.Got(got)
case <-s1.Done():
t.Fatalf("queue closed unexpectedly")
case <-s2A.Done():
t.Fatalf("queue closed unexpectedly")
case <-s2B.Done():
t.Fatalf("queue closed unexpectedly")
case <-time.After(time.Second):
t.Fatalf("timed out waiting for event")
}
}
}
func TestSpam(t *testing.T) {
b := eventbus.New()
defer b.Close()
const (
publishers = 100
eventsPerPublisher = 20
wantEvents = publishers * eventsPerPublisher
subscribers = 100
)
var g taskgroup.Group
received := make([][]EventA, subscribers)
for i := range subscribers {
c := b.Client(fmt.Sprintf("Subscriber%d", i))
defer c.Close()
s := eventbus.Subscribe[EventA](c)
g.Go(func() error {
for range wantEvents {
select {
case evt := <-s.Events():
received[i] = append(received[i], evt)
case <-s.Done():
t.Errorf("queue done before expected number of events received")
return errors.New("queue prematurely closed")
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for expected bus event after %d events", len(received[i]))
return errors.New("timeout")
}
}
return nil
})
}
published := make([][]EventA, publishers)
for i := range publishers {
g.Run(func() {
c := b.Client(fmt.Sprintf("Publisher%d", i))
p := eventbus.Publish[EventA](c)
for j := range eventsPerPublisher {
evt := EventA{i*eventsPerPublisher + j}
p.Publish(evt)
published[i] = append(published[i], evt)
}
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
var last []EventA
for i, got := range received {
if len(got) != wantEvents {
// Receiving goroutine already reported an error, we just need
// to fail early within the main test goroutine.
t.FailNow()
}
if last == nil {
continue
}
if diff := cmp.Diff(got, last); diff != "" {
t.Errorf("Subscriber %d did not see the same events as %d (-got+want):\n%s", i, i-1, diff)
}
last = got
}
for i, sent := range published {
if got := len(sent); got != eventsPerPublisher {
t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher)
}
}
// TODO: check that the published sequences are proper
// subsequences of the received slices.
}
func TestClient_Done(t *testing.T) {
b := eventbus.New()
defer b.Close()
c := b.Client(t.Name())
s := eventbus.Subscribe[string](c)
// The client is not Done until closed.
select {
case <-c.Done():
t.Fatal("Client done before being closed")
default:
// OK
}
go c.Close()
// Once closed, the client becomes Done.
select {
case <-c.Done():
// OK
case <-time.After(time.Second):
t.Fatal("timeout waiting for Client to be done")
}
// Thereafter, the subscriber should also be closed.
select {
case <-s.Done():
// OK
case <-time.After(time.Second):
t.Fatal("timoeout waiting for Subscriber to be done")
}
}
func TestMonitor(t *testing.T) {
t.Run("ZeroWait", func(t *testing.T) {
var zero eventbus.Monitor
ready := make(chan struct{})
go func() { zero.Wait(); close(ready) }()
select {
case <-ready:
// OK
case <-time.After(time.Second):
t.Fatal("timeout waiting for Wait to return")
}
})
t.Run("ZeroClose", func(t *testing.T) {
var zero eventbus.Monitor
ready := make(chan struct{})
go func() { zero.Close(); close(ready) }()
select {
case <-ready:
// OK
case <-time.After(time.Second):
t.Fatal("timeout waiting for Close to return")
}
})
testMon := func(t *testing.T, release func(*eventbus.Client, eventbus.Monitor)) func(t *testing.T) {
t.Helper()
return func(t *testing.T) {
bus := eventbus.New()
cli := bus.Client("test client")
// The monitored goroutine runs until the client or test subscription ends.
m := cli.Monitor(func(c *eventbus.Client) {
sub := eventbus.Subscribe[string](cli)
select {
case <-c.Done():
t.Log("client closed")
case <-sub.Done():
t.Log("subscription closed")
}
})
done := make(chan struct{})
go func() {
defer close(done)
m.Wait()
}()
// While the goroutine is running, Wait does not complete.
select {
case <-done:
t.Error("monitor is ready before its goroutine is finished")
default:
// OK
}
release(cli, m)
select {
case <-done:
// OK
case <-time.After(time.Second):
t.Fatal("timeout waiting for monitor to complete")
}
}
}
t.Run("Close", testMon(t, func(_ *eventbus.Client, m eventbus.Monitor) { m.Close() }))
t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() }))
}
type queueChecker struct {
t *testing.T
want []any
}
func expectEvents(t *testing.T, want ...any) *queueChecker {
return &queueChecker{t, want}
}
func (q *queueChecker) Got(v any) {
q.t.Helper()
if q.Empty() {
q.t.Fatalf("queue got unexpected %v", v)
}
if v != q.want[0] {
q.t.Fatalf("queue got %#v, want %#v", v, q.want[0])
}
q.want = q.want[1:]
}
func (q *queueChecker) Empty() bool {
return len(q.want) == 0
}