diff --git a/flake.nix b/flake.nix index da4c87a0b..e50f39638 100644 --- a/flake.nix +++ b/flake.nix @@ -151,5 +151,5 @@ }); }; } -# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= +# nix-direnv cache busting line: sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw= diff --git a/go.mod b/go.mod index 12f7946b8..836810fc0 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/golang/snappy v0.0.4 github.com/golangci/golangci-lint v1.57.1 - github.com/google/go-cmp v0.6.0 + github.com/google/go-cmp v0.7.0 github.com/google/go-containerregistry v0.20.3 github.com/google/go-tpm v0.9.4 github.com/google/gopacket v1.1.19 diff --git a/go.mod.sri b/go.mod.sri index c9f537473..108423f4e 100644 --- a/go.mod.sri +++ b/go.mod.sri @@ -1 +1 @@ -sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= +sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw= diff --git a/go.sum b/go.sum index eea0d6c7d..a0d9461ec 100644 --- a/go.sum +++ b/go.sum @@ -492,8 +492,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-containerregistry v0.20.3 h1:oNx7IdTI936V8CQRveCjaxOiegWwvM7kqkbXTpyiovI= github.com/google/go-containerregistry v0.20.3/go.mod h1:w00pIgBRDVUDFM6bq+Qx8lwNWK+cxgCuX1vd3PIBDNI= github.com/google/go-github/v66 v66.0.0 h1:ADJsaXj9UotwdgK8/iFZtv7MLc8E8WBl62WLd/D/9+M= diff --git a/shell.nix b/shell.nix index 99cfbd243..6b579b455 100644 --- a/shell.nix +++ b/shell.nix @@ -16,4 +16,4 @@ ) { src = ./.; }).shellNix -# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= +# nix-direnv cache busting line: sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw= diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index d1507d8e6..b1639136a 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -5,10 +5,12 @@ package eventbus import ( "context" + "log" "reflect" "slices" "sync" + "tailscale.com/types/logger" "tailscale.com/util/set" ) @@ -30,6 +32,7 @@ type Bus struct { write chan PublishedEvent snapshot chan chan []PublishedEvent routeDebug hook[RoutedEvent] + logf logger.Logf topicsMu sync.Mutex topics map[reflect.Type][]*subscribeState @@ -40,19 +43,42 @@ type Bus struct { clients set.Set[*Client] } -// New returns a new bus. Use [Publish] to make event publishers, -// and [Subscribe] and [SubscribeFunc] to make event subscribers. -func New() *Bus { +// New returns a new bus with default options. It is equivalent to +// calling [NewWithOptions] with zero [BusOptions]. +func New() *Bus { return NewWithOptions(BusOptions{}) } + +// NewWithOptions returns a new [Bus] with the specified [BusOptions]. +// Use [Bus.Client] to construct clients on the bus. +// Use [Publish] to make event publishers. +// Use [Subscribe] and [SubscribeFunc] to make event subscribers. +func NewWithOptions(opts BusOptions) *Bus { ret := &Bus{ write: make(chan PublishedEvent), snapshot: make(chan chan []PublishedEvent), topics: map[reflect.Type][]*subscribeState{}, clients: set.Set[*Client]{}, + logf: opts.logger(), } ret.router = runWorker(ret.pump) return ret } +// BusOptions are optional parameters for a [Bus]. A zero value is ready for +// use and provides defaults as described. +type BusOptions struct { + // Logf, if non-nil, is used for debug logs emitted by the bus and clients, + // publishers, and subscribers under its care. If it is nil, logs are sent + // to [log.Printf]. + Logf logger.Logf +} + +func (o BusOptions) logger() logger.Logf { + if o.Logf == nil { + return log.Printf + } + return o.Logf +} + // Client returns a new client with no subscriptions. Use [Subscribe] // to receive events, and [Publish] to emit events. // @@ -166,6 +192,9 @@ func (b *Bus) pump(ctx context.Context) { } } +// logger returns a [logger.Logf] to which logs related to bus activity should be written. +func (b *Bus) logger() logger.Logf { return b.logf } + func (b *Bus) dest(t reflect.Type) []*subscribeState { b.topicsMu.Lock() defer b.topicsMu.Unlock() diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index de292cf1a..1e0cd8abf 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -4,8 +4,11 @@ package eventbus_test import ( + "bytes" "errors" "fmt" + "log" + "regexp" "testing" "testing/synctest" "time" @@ -436,6 +439,76 @@ func TestMonitor(t *testing.T) { t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() })) } +func TestSlowSubs(t *testing.T) { + swapLogBuf := func(t *testing.T) *bytes.Buffer { + logBuf := new(bytes.Buffer) + save := log.Writer() + log.SetOutput(logBuf) + t.Cleanup(func() { log.SetOutput(save) }) + return logBuf + } + + t.Run("Subscriber", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + buf := swapLogBuf(t) + + b := eventbus.New() + defer b.Close() + + pc := b.Client("pub") + p := eventbus.Publish[EventA](pc) + + sc := b.Client("sub") + s := eventbus.Subscribe[EventA](sc) + + go func() { + time.Sleep(6 * time.Second) // trigger the slow check at 5s. + t.Logf("Subscriber accepted %v", <-s.Events()) + }() + + p.Publish(EventA{12345}) + + time.Sleep(7 * time.Second) // advance time... + synctest.Wait() // subscriber is done + + want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` + + `subscriber for eventbus_test.EventA is slow.*`) + if got := buf.String(); !want.MatchString(got) { + t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want) + } + }) + }) + + t.Run("SubscriberFunc", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + buf := swapLogBuf(t) + + b := eventbus.New() + defer b.Close() + + pc := b.Client("pub") + p := eventbus.Publish[EventB](pc) + + sc := b.Client("sub") + eventbus.SubscribeFunc[EventB](sc, func(e EventB) { + time.Sleep(6 * time.Second) // trigger the slow check at 5s. + t.Logf("SubscriberFunc processed %v", e) + }) + + p.Publish(EventB{67890}) + + time.Sleep(7 * time.Second) // advance time... + synctest.Wait() // subscriber is done + + want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` + + `subscriber for eventbus_test.EventB is slow.*`) + if got := buf.String(); !want.MatchString(got) { + t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want) + } + }) + }) +} + func TestRegression(t *testing.T) { bus := eventbus.New() t.Cleanup(bus.Close) diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 9e3f3ee76..c119c67a9 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -7,6 +7,7 @@ import ( "reflect" "sync" + "tailscale.com/types/logger" "tailscale.com/util/set" ) @@ -29,6 +30,8 @@ type Client struct { func (c *Client) Name() string { return c.name } +func (c *Client) logger() logger.Logf { return c.bus.logger() } + // Close closes the client. It implicitly closes all publishers and // subscribers obtained from this client. func (c *Client) Close() { @@ -142,7 +145,7 @@ func Subscribe[T any](c *Client) *Subscriber[T] { } r := c.subscribeStateLocked() - s := newSubscriber[T](r) + s := newSubscriber[T](r, logfForCaller(c.logger())) r.addSubscriber(s) return s } @@ -165,7 +168,7 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { } r := c.subscribeStateLocked() - s := newSubscriberFunc[T](r, f) + s := newSubscriberFunc[T](r, f, logfForCaller(c.logger())) r.addSubscriber(s) return s } diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 6d5463bec..2f2c9589a 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -6,12 +6,22 @@ package eventbus import ( "cmp" "fmt" + "path/filepath" "reflect" + "runtime" "slices" + "strings" "sync" "sync/atomic" + "time" + + "tailscale.com/types/logger" ) +// slowSubscriberTimeout is a timeout after which a subscriber that does not +// accept a pending event will be flagged as being slow. +const slowSubscriberTimeout = 5 * time.Second + // A Debugger offers access to a bus's privileged introspection and // debugging facilities. // @@ -204,3 +214,29 @@ type DebugTopic struct { Publisher string Subscribers []string } + +// logfForCaller returns a [logger.Logf] that prefixes its output with the +// package, filename, and line number of the caller's caller. +// If logf == nil, it returns [logger.Discard]. +// If the caller location could not be determined, it returns logf unmodified. +func logfForCaller(logf logger.Logf) logger.Logf { + if logf == nil { + return logger.Discard + } + pc, fpath, line, _ := runtime.Caller(2) // +1 for my caller, +1 for theirs + if f := runtime.FuncForPC(pc); f != nil { + return logger.WithPrefix(logf, fmt.Sprintf("%s %s:%d: ", funcPackageName(f.Name()), filepath.Base(fpath), line)) + } + return logf +} + +func funcPackageName(funcName string) string { + ls := max(strings.LastIndex(funcName, "/"), 0) + for { + i := strings.LastIndex(funcName, ".") + if i <= ls { + return funcName + } + funcName = funcName[:i] + } +} diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index c35c7e7f0..0b821b3f5 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -8,6 +8,9 @@ import ( "fmt" "reflect" "sync" + "time" + + "tailscale.com/types/logger" ) type DeliveredEvent struct { @@ -182,12 +185,18 @@ type Subscriber[T any] struct { stop stopFlag read chan T unregister func() + logf logger.Logf + slow *time.Timer // used to detect slow subscriber service } -func newSubscriber[T any](r *subscribeState) *Subscriber[T] { +func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { + slow := time.NewTimer(0) + slow.Stop() // reset in dispatch return &Subscriber[T]{ read: make(chan T), unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + logf: logf, + slow: slow, } } @@ -212,6 +221,11 @@ func (s *Subscriber[T]) monitor(debugEvent T) { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { t := vals.Peek().Event.(T) + + start := time.Now() + s.slow.Reset(slowSubscriberTimeout) + defer s.slow.Stop() + for { // Keep the cases in this select in sync with subscribeState.pump // above. The only difference should be that this select @@ -226,6 +240,9 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent return false case ch := <-snapshot: ch <- vals.Snapshot() + case <-s.slow.C: + s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) + s.slow.Reset(slowSubscriberTimeout) } } } @@ -260,12 +277,18 @@ type SubscriberFunc[T any] struct { stop stopFlag read func(T) unregister func() + logf logger.Logf + slow *time.Timer // used to detect slow subscriber service } -func newSubscriberFunc[T any](r *subscribeState, f func(T)) *SubscriberFunc[T] { +func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { + slow := time.NewTimer(0) + slow.Stop() // reset in dispatch return &SubscriberFunc[T]{ read: f, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + logf: logf, + slow: slow, } } @@ -285,6 +308,11 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE t := vals.Peek().Event.(T) callDone := make(chan struct{}) go s.runCallback(t, callDone) + + start := time.Now() + s.slow.Reset(slowSubscriberTimeout) + defer s.slow.Stop() + // Keep the cases in this select in sync with subscribeState.pump // above. The only difference should be that this select // delivers a value by calling s.read. @@ -299,6 +327,9 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE return false case ch := <-snapshot: ch <- vals.Snapshot() + case <-s.slow.C: + s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) + s.slow.Reset(slowSubscriberTimeout) } } }