From 4eec4423b42a22e5331e99240baa75fd3d6b6f3b Mon Sep 17 00:00:00 2001 From: James Tucker Date: Mon, 4 May 2026 21:20:59 +0000 Subject: [PATCH] util/eventbus: move Publisher publisher-interface impl to a non-generic core Mirrors the same refactor previously applied to SubscriberFunc: - Publisher[T]: a thin user-facing facade. Holds a pointer to a non-generic publisherCore and exposes Publish/Close/ShouldPublish. - publisherCore: a non-generic struct that owns the *Client back- pointer, stop flag, and cached reflect.Type. It implements the package-private publisher interface (publishType, Close). The bus's per-Client publisher set is set.Set[publisher] keyed on this single non-generic type. The publisher interface only exists to support diagnostic introspection (Debugger.PublishTypes returning the list of types a client publishes). Previously, satisfying that diagnostic-only interface forced *Publisher[T] to be the implementor and cost a per-T itab, generic dictionary, and equality function on every event type ever passed through Publish[T]. Moving the implementation to a non-generic core lets the diagnostic surface work unchanged while charging zero per-T cost for the diagnostic-driven generic interface. Publisher[T].Publish is also slimmed: the channel/select/stopFlag loop is now a non-generic publish() helper that takes the value as 'any'. The per-T body is reduced to forwarding the boxed value to the helper. Measured impact (util/eventbus/sizetest): total per-flow binary cost: linux/amd64: 2252.8 B/flow -> 1900.5 B/flow (-352.3 B / -15.6%) linux/arm64: 2228.2 B/flow -> 1835.0 B/flow (-393.2 B / -17.6%) Publisher per-receiver attribution: linux/amd64: 635.2 B/flow -> 369.6 B/flow (-265.6 B / -41.8%) linux/arm64: 751.7 B/flow -> 373.2 B/flow (-378.5 B / -50.4%) Cumulative reduction from the original baseline (5167ff412): linux/amd64: 3096.6 B/flow -> 1900.5 B/flow (-1196.1 B / -38.6%) linux/arm64: 3145.7 B/flow -> 1835.0 B/flow (-1310.7 B / -41.7%) Dropped per-T symbols (200-flow eventbus binary): - .dict.Publisher[T] was 14,400 B (72 B/T) - type:.eq.Publisher[T] was 11,832 B (58 B/T) - go:itab.*Publisher[T],publisher was 8,000 B (40 B/T) - (*Publisher[T]).Close shape stencils collapsed to 1 Behavior is unchanged: BenchmarkBasicThroughput is within noise (2018 -> 2038 ns/op at -benchtime=2s) and all eventbus tests pass. Updates #12614 Change-Id: I61979c2bf95d2a711c2321e6e0b4b7d15980e9f5 Signed-off-by: James Tucker --- util/eventbus/client.go | 7 ++++- util/eventbus/publish.go | 67 +++++++++++++++++++++++++++++++--------- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/util/eventbus/client.go b/util/eventbus/client.go index e2806ffd8..0168acdd4 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -181,6 +181,11 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { // It panics if c is closed. func Publish[T any](c *Client) *Publisher[T] { p := newPublisher[T](c) - c.addPublisher(p) + // Register the non-generic core with the client so the + // per-Client publisher set, the publisher interface itab, and + // the publisher equality function are not parameterized by T. + // This eliminates per-T itab/dictionary/eq cost for every new + // event type passed through Publish[T]. + c.addPublisher(p.core) return p } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index f6fd029b7..fd037ac34 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -7,8 +7,13 @@ import ( "reflect" ) -// publisher is a uniformly typed wrapper around Publisher[T], so that -// debugging facilities can look at active publishers. +// publisher is a uniformly typed wrapper around publisherCore so that +// debugging facilities can enumerate active publishers on a [Client] +// and report the types each one publishes. The interface is +// implemented by the non-generic *publisherCore (not by the typed +// user-facing *Publisher[T]); this keeps the bus's per-Client +// publisher set, and the publisher itab/dictionary, free of +// per-T duplication. type publisher interface { publishType() reflect.Type Close() @@ -16,12 +21,35 @@ type publisher interface { // A Publisher publishes typed events on a bus. type Publisher[T any] struct { + // Implementation note: Publisher[T] is a thin user-facing facade over a + // non-generic *publisherCore. Carrying T on the public type preserves the + // typed API of Publish(v T), but all of the actual state (the *Client + // back-pointer, the stop flag, and the cached reflect.Type used by + // diagnostic introspection) lives on the core and is not duplicated per T. + // + // The diagnostic surface that motivates the publisher interface + // (Debugger.PublishTypes) is served by *publisherCore directly, so adding + // new typed publishers does not pay an itab+dictionary cost just to satisfy + // diagnostic enumeration. + core *publisherCore +} + +// publisherCore is the non-generic implementation of a Publisher. +// It implements the package-private publisher interface; the bus's +// outputs map and itab key on this single type, not on Publisher[T]. +type publisherCore struct { client *Client stop stopFlag + typ reflect.Type // cached reflect.TypeFor[T]() } func newPublisher[T any](c *Client) *Publisher[T] { - return &Publisher[T]{client: c} + return &Publisher[T]{ + core: &publisherCore{ + client: c, + typ: reflect.TypeFor[T](), + }, + } } // Close closes the publisher. @@ -31,35 +59,46 @@ func newPublisher[T any](c *Client) *Publisher[T] { // If the Bus or Client from which the Publisher was created is closed, // the Publisher is implicitly closed and does not need to be closed // separately. -func (p *Publisher[T]) Close() { +func (p *Publisher[T]) Close() { p.core.Close() } + +// Close implements the publisher interface and the user-facing +// (*Publisher[T]).Close. +func (c *publisherCore) Close() { // Just unblocks any active calls to Publish, no other // synchronization needed. - p.stop.Stop() - p.client.deletePublisher(p) + c.stop.Stop() + c.client.deletePublisher(c) } -func (p *Publisher[T]) publishType() reflect.Type { - return reflect.TypeFor[T]() -} +// publishType implements the publisher interface. +func (c *publisherCore) publishType() reflect.Type { return c.typ } // Publish publishes event v on the bus. func (p *Publisher[T]) Publish(v T) { + publish(p.core, v) +} + +// publish is the non-generic body of Publisher[T].Publish. The only +// per-T work is the boxing of v into evt.Event (an `any` field) and +// the construction of the PublishedEvent struct itself; all of the +// channel/select dance is shared across every T. +func publish(c *publisherCore, v any) { // Check for just a stopped publisher or bus before trying to // write, so that once closed Publish consistently does nothing. select { - case <-p.stop.Done(): + case <-c.stop.Done(): return default: } evt := PublishedEvent{ Event: v, - From: p.client, + From: c.client, } select { - case p.client.publish() <- evt: - case <-p.stop.Done(): + case c.client.publish() <- evt: + case <-c.stop.Done(): } } @@ -70,5 +109,5 @@ func (p *Publisher[T]) Publish(v T) { // nobody seems to care. Publishers must not assume that someone will // definitely receive an event if ShouldPublish returns true. func (p *Publisher[T]) ShouldPublish() bool { - return p.client.shouldPublish(reflect.TypeFor[T]()) + return p.core.client.shouldPublish(p.core.typ) }