diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index f9e7ee3dd..de292cf1a 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "testing" + "testing/synctest" "time" "github.com/creachadair/taskgroup" @@ -64,6 +65,55 @@ func TestBus(t *testing.T) { } } +func TestSubscriberFunc(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client("TestClient") + + exp := expectEvents(t, EventA{12345}) + eventbus.SubscribeFunc[EventA](c, func(e EventA) { exp.Got(e) }) + + p := eventbus.Publish[EventA](c) + p.Publish(EventA{12345}) + + synctest.Wait() + c.Close() + + if !exp.Empty() { + t.Errorf("unexpected extra events: %+v", exp.want) + } + }) + + t.Run("SubscriberPublishes", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client("TestClient") + pa := eventbus.Publish[EventA](c) + pb := eventbus.Publish[EventB](c) + exp := expectEvents(t, EventA{127}, EventB{128}) + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + exp.Got(e) + pb.Publish(EventB{Counter: e.Counter + 1}) + }) + eventbus.SubscribeFunc[EventB](c, func(e EventB) { + exp.Got(e) + }) + + pa.Publish(EventA{127}) + + synctest.Wait() + c.Close() + if !exp.Empty() { + t.Errorf("unepxected extra events: %+v", exp.want) + } + }) + }) +} + func TestBusMultipleConsumers(t *testing.T) { b := eventbus.New() defer b.Close() @@ -111,80 +161,149 @@ func TestBusMultipleConsumers(t *testing.T) { } } -func TestSpam(t *testing.T) { - b := eventbus.New() - defer b.Close() +func TestClientMixedSubscribers(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() - const ( - publishers = 100 - eventsPerPublisher = 20 - wantEvents = publishers * eventsPerPublisher - subscribers = 100 - ) + c := b.Client("TestClient") - var g taskgroup.Group + var gotA EventA + s1 := eventbus.Subscribe[EventA](c) - received := make([][]EventA, subscribers) - for i := range subscribers { - c := b.Client(fmt.Sprintf("Subscriber%d", i)) - defer c.Close() - s := eventbus.Subscribe[EventA](c) - g.Go(func() error { - for range wantEvents { + var gotB EventB + eventbus.SubscribeFunc[EventB](c, func(e EventB) { + t.Logf("func sub received %[1]T %+[1]v", e) + gotB = e + }) + + go func() { + for { select { - case evt := <-s.Events(): - received[i] = append(received[i], evt) - case <-s.Done(): - t.Errorf("queue done before expected number of events received") - return errors.New("queue prematurely closed") - case <-time.After(5 * time.Second): - t.Errorf("timed out waiting for expected bus event after %d events", len(received[i])) - return errors.New("timeout") + case <-s1.Done(): + return + case e := <-s1.Events(): + t.Logf("chan sub received %[1]T %+[1]v", e) + gotA = e } } - return nil - }) - } + }() - published := make([][]EventA, publishers) - for i := range publishers { - g.Run(func() { + p1 := eventbus.Publish[EventA](c) + p2 := eventbus.Publish[EventB](c) + + go p1.Publish(EventA{12345}) + go p2.Publish(EventB{67890}) + + synctest.Wait() + c.Close() + synctest.Wait() + + if diff := cmp.Diff(gotB, EventB{67890}); diff != "" { + t.Errorf("Chan sub (-got, +want):\n%s", diff) + } + if diff := cmp.Diff(gotA, EventA{12345}); diff != "" { + t.Errorf("Func sub (-got, +want):\n%s", diff) + } + }) +} + +func TestSpam(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + const ( + publishers = 100 + eventsPerPublisher = 20 + wantEvents = publishers * eventsPerPublisher + subscribers = 100 + ) + + var g taskgroup.Group + + // A bunch of subscribers receiving on channels. + chanReceived := make([][]EventA, subscribers) + for i := range subscribers { + c := b.Client(fmt.Sprintf("Subscriber%d", i)) + defer c.Close() + + s := eventbus.Subscribe[EventA](c) + g.Go(func() error { + for range wantEvents { + select { + case evt := <-s.Events(): + chanReceived[i] = append(chanReceived[i], evt) + case <-s.Done(): + t.Errorf("queue done before expected number of events received") + return errors.New("queue prematurely closed") + case <-time.After(5 * time.Second): + t.Logf("timed out waiting for expected bus event after %d events", len(chanReceived[i])) + return errors.New("timeout") + } + } + return nil + }) + } + + // A bunch of subscribers receiving via a func. + funcReceived := make([][]EventA, subscribers) + for i := range subscribers { + c := b.Client(fmt.Sprintf("SubscriberFunc%d", i)) + defer c.Close() + eventbus.SubscribeFunc(c, func(e EventA) { + funcReceived[i] = append(funcReceived[i], e) + }) + } + + published := make([][]EventA, publishers) + for i := range publishers { c := b.Client(fmt.Sprintf("Publisher%d", i)) p := eventbus.Publish[EventA](c) - for j := range eventsPerPublisher { - evt := EventA{i*eventsPerPublisher + j} - p.Publish(evt) - published[i] = append(published[i], evt) + g.Run(func() { + defer c.Close() + for j := range eventsPerPublisher { + evt := EventA{i*eventsPerPublisher + j} + p.Publish(evt) + published[i] = append(published[i], evt) + } + }) + } + + if err := g.Wait(); err != nil { + t.Fatal(err) + } + synctest.Wait() + + tests := []struct { + name string + recv [][]EventA + }{ + {"Subscriber", chanReceived}, + {"SubscriberFunc", funcReceived}, + } + for _, tc := range tests { + for i, got := range tc.recv { + if len(got) != wantEvents { + t.Errorf("%s %d: got %d events, want %d", tc.name, i, len(got), wantEvents) + } + if i == 0 { + continue + } + if diff := cmp.Diff(got, tc.recv[i-1]); diff != "" { + t.Errorf("%s %d did not see the same events as %d (-got+want):\n%s", tc.name, i, i-1, diff) + } } - }) - } + } + for i, sent := range published { + if got := len(sent); got != eventsPerPublisher { + t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher) + } + } - if err := g.Wait(); err != nil { - t.Fatal(err) - } - var last []EventA - for i, got := range received { - if len(got) != wantEvents { - // Receiving goroutine already reported an error, we just need - // to fail early within the main test goroutine. - t.FailNow() - } - if last == nil { - continue - } - if diff := cmp.Diff(got, last); diff != "" { - t.Errorf("Subscriber %d did not see the same events as %d (-got+want):\n%s", i, i-1, diff) - } - last = got - } - for i, sent := range published { - if got := len(sent); got != eventsPerPublisher { - t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher) - } - } - - // TODO: check that the published sequences are proper - // subsequences of the received slices. + // TODO: check that the published sequences are proper + // subsequences of the received slices. + }) } func TestClient_Done(t *testing.T) { @@ -366,10 +485,12 @@ func expectEvents(t *testing.T, want ...any) *queueChecker { func (q *queueChecker) Got(v any) { q.t.Helper() if q.Empty() { - q.t.Fatalf("queue got unexpected %v", v) + q.t.Errorf("queue got unexpected %v", v) + return } if v != q.want[0] { - q.t.Fatalf("queue got %#v, want %#v", v, q.want[0]) + q.t.Errorf("queue got %#v, want %#v", v, q.want[0]) + return } q.want = q.want[1:] } diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 7c0268886..9e3f3ee76 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -147,6 +147,29 @@ func Subscribe[T any](c *Client) *Subscriber[T] { return s } +// SubscribeFunc is like [Subscribe], but calls the provided func for each +// event of type T. +// +// A SubscriberFunc calls f synchronously from the client's goroutine. +// This means the callback must not block for an extended period of time, +// as this will block the subscriber and slow event processing for all +// subscriptions on c. +func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { + c.mu.Lock() + defer c.mu.Unlock() + + // The caller should not race subscriptions with close, give them a useful + // diagnostic at the call site. + if c.isClosed() { + panic("cannot SubscribeFunc on a closed client") + } + + r := c.subscribeStateLocked() + s := newSubscriberFunc[T](r, f) + r.addSubscriber(s) + return s +} + // Publish returns a publisher for event type T using the given client. // It panics if c is closed. func Publish[T any](c *Client) *Publisher[T] { diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index ef155e621..56da413ef 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -61,45 +61,45 @@ func newSubscribeState(c *Client) *subscribeState { return ret } -func (q *subscribeState) pump(ctx context.Context) { +func (s *subscribeState) pump(ctx context.Context) { var vals queue[DeliveredEvent] acceptCh := func() chan DeliveredEvent { if vals.Full() { return nil } - return q.write + return s.write } for { if !vals.Empty() { val := vals.Peek() - sub := q.subscriberFor(val.Event) + sub := s.subscriberFor(val.Event) if sub == nil { // Raced with unsubscribe. vals.Drop() continue } - if !sub.dispatch(ctx, &vals, acceptCh, q.snapshot) { + if !sub.dispatch(ctx, &vals, acceptCh, s.snapshot) { return } - if q.debug.active() { - q.debug.run(DeliveredEvent{ + if s.debug.active() { + s.debug.run(DeliveredEvent{ Event: val.Event, From: val.From, - To: q.client, + To: s.client, }) } } else { // Keep the cases in this select in sync with - // Subscriber.dispatch below. The only difference should be - // that this select doesn't deliver queued values to - // anyone, and unconditionally accepts new values. + // Subscriber.dispatch and SubscriberFunc.dispatch below. + // The only difference should be that this select doesn't deliver + // queued values to anyone, and unconditionally accepts new values. select { - case val := <-q.write: + case val := <-s.write: vals.Add(val) case <-ctx.Done(): return - case ch := <-q.snapshot: + case ch := <-s.snapshot: ch <- vals.Snapshot() } } @@ -152,10 +152,10 @@ func (s *subscribeState) deleteSubscriber(t reflect.Type) { s.client.deleteSubscriber(t, s) } -func (q *subscribeState) subscriberFor(val any) subscriber { - q.outputsMu.Lock() - defer q.outputsMu.Unlock() - return q.outputs[reflect.TypeOf(val)] +func (s *subscribeState) subscriberFor(val any) subscriber { + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + return s.outputs[reflect.TypeOf(val)] } // Close closes the subscribeState. It implicitly closes all Subscribers @@ -177,6 +177,7 @@ func (s *subscribeState) closed() <-chan struct{} { } // A Subscriber delivers one type of event from a [Client]. +// Events are sent to the [Subscriber.Events] channel. type Subscriber[T any] struct { stop stopFlag read chan T @@ -252,3 +253,49 @@ func (s *Subscriber[T]) Close() { s.stop.Stop() // unblock receivers s.unregister() } + +// A SubscriberFunc delivers one type of event from a [Client]. +// Events are forwarded synchronously to a function provided at construction. +type SubscriberFunc[T any] struct { + stop stopFlag + read func(T) + unregister func() +} + +func newSubscriberFunc[T any](r *subscribeState, f func(T)) *SubscriberFunc[T] { + return &SubscriberFunc[T]{ + read: f, + unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + } +} + +// Close closes the SubscriberFunc, indicating the caller no longer wishes to +// receive this event type. After Close, no further events will be passed to +// the callback. +// +// If the [Bus] from which s was created is closed, s is implicitly closed and +// does not need to be closed separately. +func (s *SubscriberFunc[T]) Close() { s.stop.Stop(); s.unregister() } + +// subscribeType implements part of the subscriber interface. +func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } + +// dispatch implements part of the subscriber interface. +func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { + // Keep the cases in this select in sync with subscribeState.pump + // above. The only different should be that this select + // delivers a value by calling s.read. + select { + case val := <-acceptCh(): + vals.Add(val) + case <-ctx.Done(): + return false + case ch := <-snapshot: + ch <- vals.Snapshot() + default: + } + t := vals.Peek().Event.(T) + s.read(t) + vals.Drop() + return true +}