mirror of
https://github.com/tailscale/tailscale.git
synced 2026-05-07 21:26:41 +02:00
util/eventbus: extract SubscriberFunc.dispatch loop to a non-generic helper
The (*SubscriberFunc[T]).dispatch method body — a ~40-line select
loop with slow-subscriber timer, snapshot handling, ctx-cancel
draining, and a CI stack-dump branch — was previously fully
duplicated by the Go compiler for every distinct GC shape of T.
None of that body actually depends on T except for the type
assertion and the user callback invocation.
This change moves the loop body into a non-generic dispatchFunc()
helper, leaving (*SubscriberFunc[T]).dispatch as a tiny wrapper
that:
- performs the vals.Peek().Event.(T) type assertion
- spawns the callback goroutine via `go runFuncCallback(s.read,
t, callDone)` — a regular generic function call, not a closure,
so that `go` binds the args to the goroutine's frame instead of
allocating a closure on the heap. This preserves the
zero-extra-allocation behavior of the original
(*SubscriberFunc[T]).runCallback method.
- resolves T's name via reflect.TypeFor[T]().String() (cached on
the stack rather than recomputed on each %T formatting)
- calls dispatchFunc with the callDone channel
The %T formatting in the original logf calls is replaced with %s
on the resolved name string, removing per-T fmt instantiations.
A new BenchmarkBasicFuncThroughput is added alongside the existing
BenchmarkBasicThroughput so per-event allocation behavior on the
SubscribeFunc dispatch path is covered by the benchmark suite.
Measured impact (util/eventbus/sizetest):
SubscriberFunc per-flow attribution:
linux/amd64: 912.5 B/flow -> 840.8 B/flow (-71.7 B/flow)
linux/arm64: 917.5 B/flow -> 849.9 B/flow (-67.6 B/flow)
The total per-flow size delta on amd64 dropped from 3,096.6 B to
3,039.2 B (-57 B/flow). The arm64 total stayed at 3,145.7 B
because the linker's page-aligned section sizing absorbed the
improvement on this binary; the symcost-attributed per-receiver
number is the real signal.
Behavior is unchanged: BenchmarkBasicThroughput stays at 0
allocs/op and BenchmarkBasicFuncThroughput holds at the same 2
allocs/op, 144 B/op as the prior eventbus implementation. All
eventbus tests pass.
Updates #12614
Change-Id: I85f933f50f58cd25bbfe5cc46bdda7aab22f0bf7
Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
parent
87a74c3aa2
commit
0def0f19bd
@ -39,6 +39,27 @@ func BenchmarkBasicThroughput(b *testing.B) {
|
||||
bus.Close()
|
||||
}
|
||||
|
||||
// BenchmarkBasicFuncThroughput is the SubscribeFunc analogue of
|
||||
// BenchmarkBasicThroughput: one publisher and one SubscribeFunc
|
||||
// callback, shoveling events as fast as they can through the
|
||||
// plumbing. Useful for tracking per-event allocation behavior on the
|
||||
// SubscribeFunc dispatch path.
|
||||
func BenchmarkBasicFuncThroughput(b *testing.B) {
|
||||
bus := eventbus.New()
|
||||
pcli := bus.Client(b.Name() + "-pub")
|
||||
scli := bus.Client(b.Name() + "-sub")
|
||||
|
||||
type emptyEvent [0]byte
|
||||
|
||||
pub := eventbus.Publish[emptyEvent](pcli)
|
||||
eventbus.SubscribeFunc(scli, func(emptyEvent) {})
|
||||
|
||||
for b.Loop() {
|
||||
pub.Publish(emptyEvent{})
|
||||
}
|
||||
bus.Close()
|
||||
}
|
||||
|
||||
func BenchmarkSubsThroughput(b *testing.B) {
|
||||
bus := eventbus.New()
|
||||
pcli := bus.Client(b.Name() + "-pub")
|
||||
|
||||
@ -306,18 +306,67 @@ func (s *SubscriberFunc[T]) Close() { s.stop.Stop(); s.unregister() }
|
||||
func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() }
|
||||
|
||||
// dispatch implements part of the subscriber interface.
|
||||
//
|
||||
// We deliberately keep this method body small and delegate the
|
||||
// dispatch loop to dispatchFunc, a non-generic helper. Each
|
||||
// instantiation of SubscriberFunc[T] otherwise produces a fresh
|
||||
// stencil of the full ~40-line select loop (including the slow-
|
||||
// subscriber timer, snapshot handling, and CI stack-dump branch),
|
||||
// which is responsible for hundreds of bytes of binary size per
|
||||
// distinct T. By isolating the per-T work to the type assertion
|
||||
// and the callback closure, only a small fixed-size wrapper is
|
||||
// emitted per T.
|
||||
func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
|
||||
t := vals.Peek().Event.(T)
|
||||
callDone := make(chan struct{})
|
||||
go s.runCallback(t, callDone)
|
||||
// Launch the user callback on a goroutine via a generic worker
|
||||
// rather than via a closure (`go func() { s.read(t) }()` would
|
||||
// allocate the closure object on the heap on every dispatched
|
||||
// event). `go runFuncCallback(s.read, t, callDone)` binds its
|
||||
// arguments directly to the goroutine's frame, leaving allocation
|
||||
// behavior identical to the original (*SubscriberFunc[T]).runCallback
|
||||
// method.
|
||||
go runFuncCallback(s.read, t, callDone)
|
||||
return dispatchFunc(ctx, dispatchFuncState{
|
||||
slow: s.slow,
|
||||
logf: s.logf,
|
||||
typeName: reflect.TypeFor[T]().String(),
|
||||
}, vals, acceptCh, snapshot, callDone)
|
||||
}
|
||||
|
||||
// dispatchFuncState is the non-generic state needed by dispatchFunc.
|
||||
// Bundling these lets us pass them as one argument and keeps the
|
||||
// per-T wrapper at the dispatch call site small.
|
||||
type dispatchFuncState struct {
|
||||
slow *time.Timer
|
||||
logf logger.Logf
|
||||
typeName string // cached reflect.TypeFor[T]().String()
|
||||
}
|
||||
|
||||
// dispatchFunc is the non-generic body of SubscriberFunc[T].dispatch.
|
||||
// It is identical in observable behavior to the original loop; the
|
||||
// only differences are that the dispatched value has already been
|
||||
// unboxed by the caller (and the user callback is already running
|
||||
// on its own goroutine, signaling completion via callDone) and the
|
||||
// type name has already been resolved (and is passed as a string).
|
||||
//
|
||||
// callDone is closed by runFuncCallback when the user callback returns.
|
||||
func dispatchFunc(
|
||||
ctx context.Context,
|
||||
st dispatchFuncState,
|
||||
vals *queue[DeliveredEvent],
|
||||
acceptCh func() chan DeliveredEvent,
|
||||
snapshot chan chan []DeliveredEvent,
|
||||
callDone chan struct{},
|
||||
) bool {
|
||||
start := time.Now()
|
||||
s.slow.Reset(slowSubscriberTimeout)
|
||||
defer s.slow.Stop()
|
||||
st.slow.Reset(slowSubscriberTimeout)
|
||||
defer st.slow.Stop()
|
||||
|
||||
// Keep the cases in this select in sync with subscribeState.pump
|
||||
// above. The only difference should be that this select
|
||||
// delivers a value by calling s.read.
|
||||
// delivers a value by calling the user callback (via the
|
||||
// goroutine spawned by the typed wrapper).
|
||||
for {
|
||||
select {
|
||||
case <-callDone:
|
||||
@ -327,30 +376,35 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
|
||||
vals.Add(val)
|
||||
case <-ctx.Done():
|
||||
// Wait for the callback to be complete, but not forever.
|
||||
s.slow.Reset(5 * slowSubscriberTimeout)
|
||||
st.slow.Reset(5 * slowSubscriberTimeout)
|
||||
select {
|
||||
case <-s.slow.C:
|
||||
s.logf("giving up on subscriber for %T after %v at close", t, time.Since(start))
|
||||
case <-st.slow.C:
|
||||
st.logf("giving up on subscriber for %s after %v at close", st.typeName, time.Since(start))
|
||||
if cibuild.On() {
|
||||
all := make([]byte, 2<<20)
|
||||
n := runtime.Stack(all, true)
|
||||
s.logf("goroutine stacks:\n%s", all[:n])
|
||||
st.logf("goroutine stacks:\n%s", all[:n])
|
||||
}
|
||||
case <-callDone:
|
||||
}
|
||||
return false
|
||||
case ch := <-snapshot:
|
||||
ch <- vals.Snapshot()
|
||||
case <-s.slow.C:
|
||||
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
|
||||
s.slow.Reset(slowSubscriberTimeout)
|
||||
case <-st.slow.C:
|
||||
st.logf("subscriber for %s is slow (%v elapsed)", st.typeName, time.Since(start))
|
||||
st.slow.Reset(slowSubscriberTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runCallback invokes the callback on v and closes ch when it returns.
|
||||
// This should be run in a goroutine.
|
||||
func (s *SubscriberFunc[T]) runCallback(v T, ch chan struct{}) {
|
||||
defer close(ch)
|
||||
s.read(v)
|
||||
// runFuncCallback runs f(t) and closes done when it returns. It is
|
||||
// the per-T worker spawned as a goroutine for each dispatched
|
||||
// event. Keeping it as a regular generic function (rather than a
|
||||
// closure) means `go runFuncCallback(f, t, done)` binds its
|
||||
// arguments to the goroutine's frame directly, with no per-event
|
||||
// closure allocation. The body is small (defer + one indirect
|
||||
// call), so the per-shape stencil cost is minimal.
|
||||
func runFuncCallback[T any](f func(T), t T, done chan struct{}) {
|
||||
defer close(done)
|
||||
f(t)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user