mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-12 04:41:36 +01:00
util/eventbus: add tests for a subscriber publishing events
As of 2025-11-20, publishing more events than the eventbus's internal queues can hold may deadlock if a subscriber tries to publish events itself. This commit adds a test that demonstrates this deadlock, and skips it until the bug is fixed. Updates #18012 Signed-off-by: Nick Khyl <nickk@tailscale.com>
This commit is contained in:
parent
016ccae2da
commit
3780f25d51
@ -636,6 +636,7 @@ func testPublishWithMutex(t *testing.T, n int) {
|
|||||||
|
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
|
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
|
||||||
|
// Acquire the same mutex as the publisher.
|
||||||
// As of 2025-11-20, this can deadlock if n is large enough
|
// As of 2025-11-20, this can deadlock if n is large enough
|
||||||
// and event queues fill up.
|
// and event queues fill up.
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
@ -648,6 +649,7 @@ func testPublishWithMutex(t *testing.T, n int) {
|
|||||||
|
|
||||||
p := eventbus.Publish[EventA](c)
|
p := eventbus.Publish[EventA](c)
|
||||||
go func() {
|
go func() {
|
||||||
|
// Publish events, acquiring the mutex around each publish.
|
||||||
for i := range n {
|
for i := range n {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
p.Publish(EventA{Counter: i})
|
p.Publish(EventA{Counter: i})
|
||||||
@ -663,6 +665,64 @@ func testPublishWithMutex(t *testing.T, n int) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPublishFromSubscriber(t *testing.T) {
|
||||||
|
t.Run("FewEvents", func(t *testing.T) {
|
||||||
|
// Publishing up to [totalMaxQueuedItems]-1 is fine.
|
||||||
|
testPublishFromSubscriber(t, totalMaxQueuedItems-1)
|
||||||
|
})
|
||||||
|
t.Run("ManyEvents", func(t *testing.T) {
|
||||||
|
// As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock.
|
||||||
|
t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/18012")
|
||||||
|
|
||||||
|
// Using 2x to increase chance of deadlock.
|
||||||
|
testPublishFromSubscriber(t, totalMaxQueuedItems*2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// testPublishFromSubscriber publishes the specified number of EventA events.
|
||||||
|
// Each EventA causes the subscriber to publish an EventB.
|
||||||
|
// The test fails if it loses any events or if a deadlock occurs.
|
||||||
|
func testPublishFromSubscriber(t *testing.T, n int) {
|
||||||
|
synctest.Test(t, func(t *testing.T) {
|
||||||
|
b := eventbus.New()
|
||||||
|
defer b.Close()
|
||||||
|
|
||||||
|
c := b.Client("TestClient")
|
||||||
|
|
||||||
|
// Ultimately we expect to receive n EventB events
|
||||||
|
// published as a result of receiving n EventA events.
|
||||||
|
evts := make([]any, n)
|
||||||
|
for i := range evts {
|
||||||
|
evts[i] = EventB{Counter: i}
|
||||||
|
}
|
||||||
|
exp := expectEvents(t, evts...)
|
||||||
|
|
||||||
|
pubA := eventbus.Publish[EventA](c)
|
||||||
|
pubB := eventbus.Publish[EventB](c)
|
||||||
|
|
||||||
|
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
|
||||||
|
// Upon receiving EventA, publish EventB.
|
||||||
|
// As of 2025-11-20, this can deadlock if n is large enough
|
||||||
|
// and event queues fill up.
|
||||||
|
pubB.Publish(EventB{Counter: e.Counter})
|
||||||
|
})
|
||||||
|
eventbus.SubscribeFunc[EventB](c, func(e EventB) {
|
||||||
|
// Mark EventB as received.
|
||||||
|
exp.Got(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := range n {
|
||||||
|
pubA.Publish(EventA{Counter: i})
|
||||||
|
}
|
||||||
|
|
||||||
|
synctest.Wait()
|
||||||
|
|
||||||
|
if !exp.Empty() {
|
||||||
|
t.Errorf("unexpected extra events: %+v", exp.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
type queueChecker struct {
|
type queueChecker struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
want []any
|
want []any
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user