util/eventbus: allow logging of slow subscribers (#17705)

Add options to the eventbus.Bus to plumb in a logger.

Route that logger in to the subscriber machinery, and trigger a log message to
it when a subscriber fails to respond to its delivered events for 5s or more.

The log message includes the package, filename, and line number of the call
site that created the subscription.

Add tests that verify this works.

Updates #17680

Change-Id: I0546516476b1e13e6a9cf79f19db2fe55e56c698
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
M. J. Fromberger 2025-10-30 14:40:57 -07:00 committed by GitHub
parent f522b9dbb7
commit 061e6266cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 185 additions and 13 deletions

View File

@ -151,5 +151,5 @@
}); });
}; };
} }
# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= # nix-direnv cache busting line: sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw=

2
go.mod
View File

@ -43,7 +43,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/snappy v0.0.4 github.com/golang/snappy v0.0.4
github.com/golangci/golangci-lint v1.57.1 github.com/golangci/golangci-lint v1.57.1
github.com/google/go-cmp v0.6.0 github.com/google/go-cmp v0.7.0
github.com/google/go-containerregistry v0.20.3 github.com/google/go-containerregistry v0.20.3
github.com/google/go-tpm v0.9.4 github.com/google/go-tpm v0.9.4
github.com/google/gopacket v1.1.19 github.com/google/gopacket v1.1.19

View File

@ -1 +1 @@
sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw=

4
go.sum
View File

@ -492,8 +492,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-containerregistry v0.20.3 h1:oNx7IdTI936V8CQRveCjaxOiegWwvM7kqkbXTpyiovI= github.com/google/go-containerregistry v0.20.3 h1:oNx7IdTI936V8CQRveCjaxOiegWwvM7kqkbXTpyiovI=
github.com/google/go-containerregistry v0.20.3/go.mod h1:w00pIgBRDVUDFM6bq+Qx8lwNWK+cxgCuX1vd3PIBDNI= github.com/google/go-containerregistry v0.20.3/go.mod h1:w00pIgBRDVUDFM6bq+Qx8lwNWK+cxgCuX1vd3PIBDNI=
github.com/google/go-github/v66 v66.0.0 h1:ADJsaXj9UotwdgK8/iFZtv7MLc8E8WBl62WLd/D/9+M= github.com/google/go-github/v66 v66.0.0 h1:ADJsaXj9UotwdgK8/iFZtv7MLc8E8WBl62WLd/D/9+M=

View File

@ -16,4 +16,4 @@
) { ) {
src = ./.; src = ./.;
}).shellNix }).shellNix
# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= # nix-direnv cache busting line: sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw=

View File

@ -5,10 +5,12 @@ package eventbus
import ( import (
"context" "context"
"log"
"reflect" "reflect"
"slices" "slices"
"sync" "sync"
"tailscale.com/types/logger"
"tailscale.com/util/set" "tailscale.com/util/set"
) )
@ -30,6 +32,7 @@ type Bus struct {
write chan PublishedEvent write chan PublishedEvent
snapshot chan chan []PublishedEvent snapshot chan chan []PublishedEvent
routeDebug hook[RoutedEvent] routeDebug hook[RoutedEvent]
logf logger.Logf
topicsMu sync.Mutex topicsMu sync.Mutex
topics map[reflect.Type][]*subscribeState topics map[reflect.Type][]*subscribeState
@ -40,19 +43,42 @@ type Bus struct {
clients set.Set[*Client] clients set.Set[*Client]
} }
// New returns a new bus. Use [Publish] to make event publishers, // New returns a new bus with default options. It is equivalent to
// and [Subscribe] and [SubscribeFunc] to make event subscribers. // calling [NewWithOptions] with zero [BusOptions].
func New() *Bus { func New() *Bus { return NewWithOptions(BusOptions{}) }
// NewWithOptions returns a new [Bus] with the specified [BusOptions].
// Use [Bus.Client] to construct clients on the bus.
// Use [Publish] to make event publishers.
// Use [Subscribe] and [SubscribeFunc] to make event subscribers.
func NewWithOptions(opts BusOptions) *Bus {
ret := &Bus{ ret := &Bus{
write: make(chan PublishedEvent), write: make(chan PublishedEvent),
snapshot: make(chan chan []PublishedEvent), snapshot: make(chan chan []PublishedEvent),
topics: map[reflect.Type][]*subscribeState{}, topics: map[reflect.Type][]*subscribeState{},
clients: set.Set[*Client]{}, clients: set.Set[*Client]{},
logf: opts.logger(),
} }
ret.router = runWorker(ret.pump) ret.router = runWorker(ret.pump)
return ret return ret
} }
// BusOptions are optional parameters for a [Bus]. A zero value is ready for
// use and provides defaults as described.
type BusOptions struct {
// Logf, if non-nil, is used for debug logs emitted by the bus and clients,
// publishers, and subscribers under its care. If it is nil, logs are sent
// to [log.Printf].
Logf logger.Logf
}
func (o BusOptions) logger() logger.Logf {
if o.Logf == nil {
return log.Printf
}
return o.Logf
}
// Client returns a new client with no subscriptions. Use [Subscribe] // Client returns a new client with no subscriptions. Use [Subscribe]
// to receive events, and [Publish] to emit events. // to receive events, and [Publish] to emit events.
// //
@ -166,6 +192,9 @@ func (b *Bus) pump(ctx context.Context) {
} }
} }
// logger returns a [logger.Logf] to which logs related to bus activity should be written.
func (b *Bus) logger() logger.Logf { return b.logf }
func (b *Bus) dest(t reflect.Type) []*subscribeState { func (b *Bus) dest(t reflect.Type) []*subscribeState {
b.topicsMu.Lock() b.topicsMu.Lock()
defer b.topicsMu.Unlock() defer b.topicsMu.Unlock()

View File

@ -4,8 +4,11 @@
package eventbus_test package eventbus_test
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"log"
"regexp"
"testing" "testing"
"testing/synctest" "testing/synctest"
"time" "time"
@ -436,6 +439,76 @@ func TestMonitor(t *testing.T) {
t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() })) t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() }))
} }
func TestSlowSubs(t *testing.T) {
swapLogBuf := func(t *testing.T) *bytes.Buffer {
logBuf := new(bytes.Buffer)
save := log.Writer()
log.SetOutput(logBuf)
t.Cleanup(func() { log.SetOutput(save) })
return logBuf
}
t.Run("Subscriber", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
buf := swapLogBuf(t)
b := eventbus.New()
defer b.Close()
pc := b.Client("pub")
p := eventbus.Publish[EventA](pc)
sc := b.Client("sub")
s := eventbus.Subscribe[EventA](sc)
go func() {
time.Sleep(6 * time.Second) // trigger the slow check at 5s.
t.Logf("Subscriber accepted %v", <-s.Events())
}()
p.Publish(EventA{12345})
time.Sleep(7 * time.Second) // advance time...
synctest.Wait() // subscriber is done
want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` +
`subscriber for eventbus_test.EventA is slow.*`)
if got := buf.String(); !want.MatchString(got) {
t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want)
}
})
})
t.Run("SubscriberFunc", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
buf := swapLogBuf(t)
b := eventbus.New()
defer b.Close()
pc := b.Client("pub")
p := eventbus.Publish[EventB](pc)
sc := b.Client("sub")
eventbus.SubscribeFunc[EventB](sc, func(e EventB) {
time.Sleep(6 * time.Second) // trigger the slow check at 5s.
t.Logf("SubscriberFunc processed %v", e)
})
p.Publish(EventB{67890})
time.Sleep(7 * time.Second) // advance time...
synctest.Wait() // subscriber is done
want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` +
`subscriber for eventbus_test.EventB is slow.*`)
if got := buf.String(); !want.MatchString(got) {
t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want)
}
})
})
}
func TestRegression(t *testing.T) { func TestRegression(t *testing.T) {
bus := eventbus.New() bus := eventbus.New()
t.Cleanup(bus.Close) t.Cleanup(bus.Close)

View File

@ -7,6 +7,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"tailscale.com/types/logger"
"tailscale.com/util/set" "tailscale.com/util/set"
) )
@ -29,6 +30,8 @@ type Client struct {
func (c *Client) Name() string { return c.name } func (c *Client) Name() string { return c.name }
func (c *Client) logger() logger.Logf { return c.bus.logger() }
// Close closes the client. It implicitly closes all publishers and // Close closes the client. It implicitly closes all publishers and
// subscribers obtained from this client. // subscribers obtained from this client.
func (c *Client) Close() { func (c *Client) Close() {
@ -142,7 +145,7 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
} }
r := c.subscribeStateLocked() r := c.subscribeStateLocked()
s := newSubscriber[T](r) s := newSubscriber[T](r, logfForCaller(c.logger()))
r.addSubscriber(s) r.addSubscriber(s)
return s return s
} }
@ -165,7 +168,7 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
} }
r := c.subscribeStateLocked() r := c.subscribeStateLocked()
s := newSubscriberFunc[T](r, f) s := newSubscriberFunc[T](r, f, logfForCaller(c.logger()))
r.addSubscriber(s) r.addSubscriber(s)
return s return s
} }

View File

@ -6,12 +6,22 @@ package eventbus
import ( import (
"cmp" "cmp"
"fmt" "fmt"
"path/filepath"
"reflect" "reflect"
"runtime"
"slices" "slices"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"tailscale.com/types/logger"
) )
// slowSubscriberTimeout is a timeout after which a subscriber that does not
// accept a pending event will be flagged as being slow.
const slowSubscriberTimeout = 5 * time.Second
// A Debugger offers access to a bus's privileged introspection and // A Debugger offers access to a bus's privileged introspection and
// debugging facilities. // debugging facilities.
// //
@ -204,3 +214,29 @@ type DebugTopic struct {
Publisher string Publisher string
Subscribers []string Subscribers []string
} }
// logfForCaller returns a [logger.Logf] that prefixes its output with the
// package, filename, and line number of the caller's caller.
// If logf == nil, it returns [logger.Discard].
// If the caller location could not be determined, it returns logf unmodified.
func logfForCaller(logf logger.Logf) logger.Logf {
if logf == nil {
return logger.Discard
}
pc, fpath, line, _ := runtime.Caller(2) // +1 for my caller, +1 for theirs
if f := runtime.FuncForPC(pc); f != nil {
return logger.WithPrefix(logf, fmt.Sprintf("%s %s:%d: ", funcPackageName(f.Name()), filepath.Base(fpath), line))
}
return logf
}
func funcPackageName(funcName string) string {
ls := max(strings.LastIndex(funcName, "/"), 0)
for {
i := strings.LastIndex(funcName, ".")
if i <= ls {
return funcName
}
funcName = funcName[:i]
}
}

View File

@ -8,6 +8,9 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"time"
"tailscale.com/types/logger"
) )
type DeliveredEvent struct { type DeliveredEvent struct {
@ -182,12 +185,18 @@ type Subscriber[T any] struct {
stop stopFlag stop stopFlag
read chan T read chan T
unregister func() unregister func()
logf logger.Logf
slow *time.Timer // used to detect slow subscriber service
} }
func newSubscriber[T any](r *subscribeState) *Subscriber[T] { func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
return &Subscriber[T]{ return &Subscriber[T]{
read: make(chan T), read: make(chan T),
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
} }
} }
@ -212,6 +221,11 @@ func (s *Subscriber[T]) monitor(debugEvent T) {
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
t := vals.Peek().Event.(T) t := vals.Peek().Event.(T)
start := time.Now()
s.slow.Reset(slowSubscriberTimeout)
defer s.slow.Stop()
for { for {
// Keep the cases in this select in sync with subscribeState.pump // Keep the cases in this select in sync with subscribeState.pump
// above. The only difference should be that this select // above. The only difference should be that this select
@ -226,6 +240,9 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent
return false return false
case ch := <-snapshot: case ch := <-snapshot:
ch <- vals.Snapshot() ch <- vals.Snapshot()
case <-s.slow.C:
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
s.slow.Reset(slowSubscriberTimeout)
} }
} }
} }
@ -260,12 +277,18 @@ type SubscriberFunc[T any] struct {
stop stopFlag stop stopFlag
read func(T) read func(T)
unregister func() unregister func()
logf logger.Logf
slow *time.Timer // used to detect slow subscriber service
} }
func newSubscriberFunc[T any](r *subscribeState, f func(T)) *SubscriberFunc[T] { func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
return &SubscriberFunc[T]{ return &SubscriberFunc[T]{
read: f, read: f,
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
} }
} }
@ -285,6 +308,11 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
t := vals.Peek().Event.(T) t := vals.Peek().Event.(T)
callDone := make(chan struct{}) callDone := make(chan struct{})
go s.runCallback(t, callDone) go s.runCallback(t, callDone)
start := time.Now()
s.slow.Reset(slowSubscriberTimeout)
defer s.slow.Stop()
// Keep the cases in this select in sync with subscribeState.pump // Keep the cases in this select in sync with subscribeState.pump
// above. The only difference should be that this select // above. The only difference should be that this select
// delivers a value by calling s.read. // delivers a value by calling s.read.
@ -299,6 +327,9 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
return false return false
case ch := <-snapshot: case ch := <-snapshot:
ch <- vals.Snapshot() ch <- vals.Snapshot()
case <-s.slow.C:
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
s.slow.Reset(slowSubscriberTimeout)
} }
} }
} }