From 0def0f19bdf150192efe22bca92b6b2649d1eb83 Mon Sep 17 00:00:00 2001 From: James Tucker Date: Mon, 4 May 2026 21:01:15 +0000 Subject: [PATCH] util/eventbus: extract SubscriberFunc.dispatch loop to a non-generic helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The (*SubscriberFunc[T]).dispatch method body — a ~40-line select loop with slow-subscriber timer, snapshot handling, ctx-cancel draining, and a CI stack-dump branch — was previously fully duplicated by the Go compiler for every distinct GC shape of T. None of that body actually depends on T except for the type assertion and the user callback invocation. This change moves the loop body into a non-generic dispatchFunc() helper, leaving (*SubscriberFunc[T]).dispatch as a tiny wrapper that: - performs the vals.Peek().Event.(T) type assertion - spawns the callback goroutine via `go runFuncCallback(s.read, t, callDone)` — a regular generic function call, not a closure, so that `go` binds the args to the goroutine's frame instead of allocating a closure on the heap. This preserves the zero-extra-allocation behavior of the original (*SubscriberFunc[T]).runCallback method. - resolves T's name via reflect.TypeFor[T]().String() (cached on the stack rather than recomputed on each %T formatting) - calls dispatchFunc with the callDone channel The %T formatting in the original logf calls is replaced with %s on the resolved name string, removing per-T fmt instantiations. A new BenchmarkBasicFuncThroughput is added alongside the existing BenchmarkBasicThroughput so per-event allocation behavior on the SubscribeFunc dispatch path is covered by the benchmark suite. Measured impact (util/eventbus/sizetest): SubscriberFunc per-flow attribution: linux/amd64: 912.5 B/flow -> 840.8 B/flow (-71.7 B/flow) linux/arm64: 917.5 B/flow -> 849.9 B/flow (-67.6 B/flow) The total per-flow size delta on amd64 dropped from 3,096.6 B to 3,039.2 B (-57 B/flow). The arm64 total stayed at 3,145.7 B because the linker's page-aligned section sizing absorbed the improvement on this binary; the symcost-attributed per-receiver number is the real signal. Behavior is unchanged: BenchmarkBasicThroughput stays at 0 allocs/op and BenchmarkBasicFuncThroughput holds at the same 2 allocs/op, 144 B/op as the prior eventbus implementation. All eventbus tests pass. Updates #12614 Change-Id: I85f933f50f58cd25bbfe5cc46bdda7aab22f0bf7 Signed-off-by: James Tucker --- util/eventbus/bench_test.go | 21 +++++++++ util/eventbus/subscribe.go | 86 ++++++++++++++++++++++++++++++------- 2 files changed, 91 insertions(+), 16 deletions(-) diff --git a/util/eventbus/bench_test.go b/util/eventbus/bench_test.go index 7cd7a4241..9657c4e6f 100644 --- a/util/eventbus/bench_test.go +++ b/util/eventbus/bench_test.go @@ -39,6 +39,27 @@ func BenchmarkBasicThroughput(b *testing.B) { bus.Close() } +// BenchmarkBasicFuncThroughput is the SubscribeFunc analogue of +// BenchmarkBasicThroughput: one publisher and one SubscribeFunc +// callback, shoveling events as fast as they can through the +// plumbing. Useful for tracking per-event allocation behavior on the +// SubscribeFunc dispatch path. +func BenchmarkBasicFuncThroughput(b *testing.B) { + bus := eventbus.New() + pcli := bus.Client(b.Name() + "-pub") + scli := bus.Client(b.Name() + "-sub") + + type emptyEvent [0]byte + + pub := eventbus.Publish[emptyEvent](pcli) + eventbus.SubscribeFunc(scli, func(emptyEvent) {}) + + for b.Loop() { + pub.Publish(emptyEvent{}) + } + bus.Close() +} + func BenchmarkSubsThroughput(b *testing.B) { bus := eventbus.New() pcli := bus.Client(b.Name() + "-pub") diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 3edf6deb4..d53bce8d4 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -306,18 +306,67 @@ func (s *SubscriberFunc[T]) Close() { s.stop.Stop(); s.unregister() } func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFor[T]() } // dispatch implements part of the subscriber interface. +// +// We deliberately keep this method body small and delegate the +// dispatch loop to dispatchFunc, a non-generic helper. Each +// instantiation of SubscriberFunc[T] otherwise produces a fresh +// stencil of the full ~40-line select loop (including the slow- +// subscriber timer, snapshot handling, and CI stack-dump branch), +// which is responsible for hundreds of bytes of binary size per +// distinct T. By isolating the per-T work to the type assertion +// and the callback closure, only a small fixed-size wrapper is +// emitted per T. func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { t := vals.Peek().Event.(T) callDone := make(chan struct{}) - go s.runCallback(t, callDone) + // Launch the user callback on a goroutine via a generic worker + // rather than via a closure (`go func() { s.read(t) }()` would + // allocate the closure object on the heap on every dispatched + // event). `go runFuncCallback(s.read, t, callDone)` binds its + // arguments directly to the goroutine's frame, leaving allocation + // behavior identical to the original (*SubscriberFunc[T]).runCallback + // method. + go runFuncCallback(s.read, t, callDone) + return dispatchFunc(ctx, dispatchFuncState{ + slow: s.slow, + logf: s.logf, + typeName: reflect.TypeFor[T]().String(), + }, vals, acceptCh, snapshot, callDone) +} +// dispatchFuncState is the non-generic state needed by dispatchFunc. +// Bundling these lets us pass them as one argument and keeps the +// per-T wrapper at the dispatch call site small. +type dispatchFuncState struct { + slow *time.Timer + logf logger.Logf + typeName string // cached reflect.TypeFor[T]().String() +} + +// dispatchFunc is the non-generic body of SubscriberFunc[T].dispatch. +// It is identical in observable behavior to the original loop; the +// only differences are that the dispatched value has already been +// unboxed by the caller (and the user callback is already running +// on its own goroutine, signaling completion via callDone) and the +// type name has already been resolved (and is passed as a string). +// +// callDone is closed by runFuncCallback when the user callback returns. +func dispatchFunc( + ctx context.Context, + st dispatchFuncState, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, + callDone chan struct{}, +) bool { start := time.Now() - s.slow.Reset(slowSubscriberTimeout) - defer s.slow.Stop() + st.slow.Reset(slowSubscriberTimeout) + defer st.slow.Stop() // Keep the cases in this select in sync with subscribeState.pump // above. The only difference should be that this select - // delivers a value by calling s.read. + // delivers a value by calling the user callback (via the + // goroutine spawned by the typed wrapper). for { select { case <-callDone: @@ -327,30 +376,35 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE vals.Add(val) case <-ctx.Done(): // Wait for the callback to be complete, but not forever. - s.slow.Reset(5 * slowSubscriberTimeout) + st.slow.Reset(5 * slowSubscriberTimeout) select { - case <-s.slow.C: - s.logf("giving up on subscriber for %T after %v at close", t, time.Since(start)) + case <-st.slow.C: + st.logf("giving up on subscriber for %s after %v at close", st.typeName, time.Since(start)) if cibuild.On() { all := make([]byte, 2<<20) n := runtime.Stack(all, true) - s.logf("goroutine stacks:\n%s", all[:n]) + st.logf("goroutine stacks:\n%s", all[:n]) } case <-callDone: } return false case ch := <-snapshot: ch <- vals.Snapshot() - case <-s.slow.C: - s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) - s.slow.Reset(slowSubscriberTimeout) + case <-st.slow.C: + st.logf("subscriber for %s is slow (%v elapsed)", st.typeName, time.Since(start)) + st.slow.Reset(slowSubscriberTimeout) } } } -// runCallback invokes the callback on v and closes ch when it returns. -// This should be run in a goroutine. -func (s *SubscriberFunc[T]) runCallback(v T, ch chan struct{}) { - defer close(ch) - s.read(v) +// runFuncCallback runs f(t) and closes done when it returns. It is +// the per-T worker spawned as a goroutine for each dispatched +// event. Keeping it as a regular generic function (rather than a +// closure) means `go runFuncCallback(f, t, done)` binds its +// arguments to the goroutine's frame directly, with no per-event +// closure allocation. The body is small (defer + one indirect +// call), so the per-shape stencil cost is minimal. +func runFuncCallback[T any](f func(T), t T, done chan struct{}) { + defer close(done) + f(t) }