mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-05 09:21:36 +01:00
Bounded DeliveredEvent queues reduce memory usage, but they can deadlock under load. Two common scenarios trigger deadlocks when the number of events published in a short period exceeds twice the queue capacity (there's a PublishedEvent queue of the same size): - a subscriber tries to acquire the same mutex as held by a publisher, or - a subscriber for A events publishes B events Avoiding these scenarios is not practical and would limit eventbus usefulness and reduce its adoption, pushing us back to callbacks and other legacy mechanisms. These deadlocks already occurred in customer devices, dev machines, and tests. They also make it harder to identify and fix slow subscribers and similar issues we have been seeing recently. Choosing an arbitrary large fixed queue capacity would only mask the problem. A client running on a sufficiently large and complex customer environment can exceed any meaningful constant limit, since event volume depends on the number of peers and other factors. Behavior also changes based on scheduling of publishers and subscribers by the Go runtime, OS, and hardware, as the issue is essentially a race between publishers and subscribers. Additionally, on lower-end devices, an unreasonably high constant capacity is practically the same as using unbounded queues. Therefore, this PR changes the event queue implementation to be unbounded by default. The PublishedEvent queue keeps its existing capacity of 16 items, while subscribers' DeliveredEvent queues become unbounded. This change fixes known deadlocks and makes the system stable under load, at the cost of higher potential memory usage, including cases where a queue grows during an event burst and does not shrink when load decreases. Further improvements can be implemented in the future as needed. Fixes #17973 Fixes #18012 Signed-off-by: Nick Khyl <nickk@tailscale.com>
346 lines
8.4 KiB
Go
346 lines
8.4 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"reflect"
|
|
"slices"
|
|
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/util/set"
|
|
)
|
|
|
|
type PublishedEvent struct {
|
|
Event any
|
|
From *Client
|
|
}
|
|
|
|
type RoutedEvent struct {
|
|
Event any
|
|
From *Client
|
|
To []*Client
|
|
}
|
|
|
|
// Bus is an event bus that distributes published events to interested
|
|
// subscribers.
|
|
type Bus struct {
|
|
router *worker
|
|
write chan PublishedEvent
|
|
snapshot chan chan []PublishedEvent
|
|
routeDebug hook[RoutedEvent]
|
|
logf logger.Logf
|
|
|
|
topicsMu syncs.Mutex
|
|
topics map[reflect.Type][]*subscribeState
|
|
|
|
// Used for introspection/debugging only, not in the normal event
|
|
// publishing path.
|
|
clientsMu syncs.Mutex
|
|
clients set.Set[*Client]
|
|
}
|
|
|
|
// New returns a new bus with default options. It is equivalent to
|
|
// calling [NewWithOptions] with zero [BusOptions].
|
|
func New() *Bus { return NewWithOptions(BusOptions{}) }
|
|
|
|
// NewWithOptions returns a new [Bus] with the specified [BusOptions].
|
|
// Use [Bus.Client] to construct clients on the bus.
|
|
// Use [Publish] to make event publishers.
|
|
// Use [Subscribe] and [SubscribeFunc] to make event subscribers.
|
|
func NewWithOptions(opts BusOptions) *Bus {
|
|
ret := &Bus{
|
|
write: make(chan PublishedEvent),
|
|
snapshot: make(chan chan []PublishedEvent),
|
|
topics: map[reflect.Type][]*subscribeState{},
|
|
clients: set.Set[*Client]{},
|
|
logf: opts.logger(),
|
|
}
|
|
ret.router = runWorker(ret.pump)
|
|
return ret
|
|
}
|
|
|
|
// BusOptions are optional parameters for a [Bus]. A zero value is ready for
|
|
// use and provides defaults as described.
|
|
type BusOptions struct {
|
|
// Logf, if non-nil, is used for debug logs emitted by the bus and clients,
|
|
// publishers, and subscribers under its care. If it is nil, logs are sent
|
|
// to [log.Printf].
|
|
Logf logger.Logf
|
|
}
|
|
|
|
func (o BusOptions) logger() logger.Logf {
|
|
if o.Logf == nil {
|
|
return log.Printf
|
|
}
|
|
return o.Logf
|
|
}
|
|
|
|
// Client returns a new client with no subscriptions. Use [Subscribe]
|
|
// to receive events, and [Publish] to emit events.
|
|
//
|
|
// The client's name is used only for debugging, to tell humans what
|
|
// piece of code a publisher/subscriber belongs to. Aim for something
|
|
// short but unique, for example "kernel-route-monitor" or "taildrop",
|
|
// not "watcher".
|
|
func (b *Bus) Client(name string) *Client {
|
|
ret := &Client{
|
|
name: name,
|
|
bus: b,
|
|
pub: set.Set[publisher]{},
|
|
}
|
|
b.clientsMu.Lock()
|
|
defer b.clientsMu.Unlock()
|
|
b.clients.Add(ret)
|
|
return ret
|
|
}
|
|
|
|
// Debugger returns the debugging facility for the bus.
|
|
func (b *Bus) Debugger() *Debugger {
|
|
return &Debugger{b}
|
|
}
|
|
|
|
// Close closes the bus. It implicitly closes all clients, publishers and
|
|
// subscribers attached to the bus.
|
|
//
|
|
// Close blocks until the bus is fully shut down. The bus is
|
|
// permanently unusable after closing.
|
|
func (b *Bus) Close() {
|
|
b.router.StopAndWait()
|
|
|
|
b.clientsMu.Lock()
|
|
defer b.clientsMu.Unlock()
|
|
for c := range b.clients {
|
|
c.Close()
|
|
}
|
|
b.clients = nil
|
|
}
|
|
|
|
func (b *Bus) pump(ctx context.Context) {
|
|
// Limit how many published events we can buffer in the PublishedEvent queue.
|
|
//
|
|
// Subscribers have unbounded DeliveredEvent queues (see tailscale/tailscale#18020),
|
|
// so this queue doesn't need to be unbounded. Keeping it bounded may also help
|
|
// catch cases where subscribers stop pumping events completely, such as due to a bug
|
|
// in [subscribeState.pump], [Subscriber.dispatch], or [SubscriberFunc.dispatch]).
|
|
const maxPublishedEvents = 16
|
|
vals := queue[PublishedEvent]{capacity: maxPublishedEvents}
|
|
acceptCh := func() chan PublishedEvent {
|
|
if vals.Full() {
|
|
return nil
|
|
}
|
|
return b.write
|
|
}
|
|
for {
|
|
// Drain all pending events. Note that while we're draining
|
|
// events into subscriber queues, we continue to
|
|
// opportunistically accept more incoming events, if we have
|
|
// queue space for it.
|
|
for !vals.Empty() {
|
|
val := vals.Peek()
|
|
dests := b.dest(reflect.TypeOf(val.Event))
|
|
|
|
if b.routeDebug.active() {
|
|
clients := make([]*Client, len(dests))
|
|
for i := range len(dests) {
|
|
clients[i] = dests[i].client
|
|
}
|
|
b.routeDebug.run(RoutedEvent{
|
|
Event: val.Event,
|
|
From: val.From,
|
|
To: clients,
|
|
})
|
|
}
|
|
|
|
for _, d := range dests {
|
|
evt := DeliveredEvent{
|
|
Event: val.Event,
|
|
From: val.From,
|
|
To: d.client,
|
|
}
|
|
deliverOne:
|
|
for {
|
|
select {
|
|
case d.write <- evt:
|
|
break deliverOne
|
|
case <-d.closed():
|
|
// Queue closed, don't block but continue
|
|
// delivering to others.
|
|
break deliverOne
|
|
case in := <-acceptCh():
|
|
vals.Add(in)
|
|
in.From.publishDebug.run(in)
|
|
case <-ctx.Done():
|
|
return
|
|
case ch := <-b.snapshot:
|
|
ch <- vals.Snapshot()
|
|
}
|
|
}
|
|
}
|
|
vals.Drop()
|
|
}
|
|
|
|
// Inbound queue empty, wait for at least 1 work item before
|
|
// resuming.
|
|
for vals.Empty() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case in := <-b.write:
|
|
vals.Add(in)
|
|
in.From.publishDebug.run(in)
|
|
case ch := <-b.snapshot:
|
|
ch <- nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// logger returns a [logger.Logf] to which logs related to bus activity should be written.
|
|
func (b *Bus) logger() logger.Logf { return b.logf }
|
|
|
|
func (b *Bus) dest(t reflect.Type) []*subscribeState {
|
|
b.topicsMu.Lock()
|
|
defer b.topicsMu.Unlock()
|
|
return b.topics[t]
|
|
}
|
|
|
|
func (b *Bus) shouldPublish(t reflect.Type) bool {
|
|
if b.routeDebug.active() {
|
|
return true
|
|
}
|
|
|
|
b.topicsMu.Lock()
|
|
defer b.topicsMu.Unlock()
|
|
return len(b.topics[t]) > 0
|
|
}
|
|
|
|
func (b *Bus) listClients() []*Client {
|
|
b.clientsMu.Lock()
|
|
defer b.clientsMu.Unlock()
|
|
return b.clients.Slice()
|
|
}
|
|
|
|
func (b *Bus) snapshotPublishQueue() []PublishedEvent {
|
|
resp := make(chan []PublishedEvent)
|
|
select {
|
|
case b.snapshot <- resp:
|
|
return <-resp
|
|
case <-b.router.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (b *Bus) subscribe(t reflect.Type, q *subscribeState) (cancel func()) {
|
|
b.topicsMu.Lock()
|
|
defer b.topicsMu.Unlock()
|
|
b.topics[t] = append(b.topics[t], q)
|
|
return func() {
|
|
b.unsubscribe(t, q)
|
|
}
|
|
}
|
|
|
|
func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) {
|
|
b.topicsMu.Lock()
|
|
defer b.topicsMu.Unlock()
|
|
// Topic slices are accessed by pump without holding a lock, so we
|
|
// have to replace the entire slice when unsubscribing.
|
|
// Unsubscribing should be infrequent enough that this won't
|
|
// matter.
|
|
i := slices.Index(b.topics[t], q)
|
|
if i < 0 {
|
|
return
|
|
}
|
|
b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1)
|
|
}
|
|
|
|
// A worker runs a worker goroutine and helps coordinate its shutdown.
|
|
type worker struct {
|
|
ctx context.Context
|
|
stop context.CancelFunc
|
|
stopped chan struct{}
|
|
}
|
|
|
|
// runWorker creates a worker goroutine running fn. The context passed
|
|
// to fn is canceled by [worker.Stop].
|
|
func runWorker(fn func(context.Context)) *worker {
|
|
ctx, stop := context.WithCancel(context.Background())
|
|
ret := &worker{
|
|
ctx: ctx,
|
|
stop: stop,
|
|
stopped: make(chan struct{}),
|
|
}
|
|
go ret.run(fn)
|
|
return ret
|
|
}
|
|
|
|
func (w *worker) run(fn func(context.Context)) {
|
|
defer close(w.stopped)
|
|
fn(w.ctx)
|
|
}
|
|
|
|
// Stop signals the worker goroutine to shut down.
|
|
func (w *worker) Stop() { w.stop() }
|
|
|
|
// Done returns a channel that is closed when the worker goroutine
|
|
// exits.
|
|
func (w *worker) Done() <-chan struct{} { return w.stopped }
|
|
|
|
// Wait waits until the worker goroutine has exited.
|
|
func (w *worker) Wait() { <-w.stopped }
|
|
|
|
// StopAndWait signals the worker goroutine to shut down, then waits
|
|
// for it to exit.
|
|
func (w *worker) StopAndWait() {
|
|
w.stop()
|
|
<-w.stopped
|
|
}
|
|
|
|
// stopFlag is a value that can be watched for a notification. The
|
|
// zero value is ready for use.
|
|
//
|
|
// The flag is notified by running [stopFlag.Stop]. Stop can be called
|
|
// multiple times. Upon the first call to Stop, [stopFlag.Done] is
|
|
// closed, all pending [stopFlag.Wait] calls return, and future Wait
|
|
// calls return immediately.
|
|
//
|
|
// A stopFlag can only notify once, and is intended for use as a
|
|
// one-way shutdown signal that's lighter than a cancellable
|
|
// context.Context.
|
|
type stopFlag struct {
|
|
// guards the lazy construction of stopped, and the value of
|
|
// alreadyStopped.
|
|
mu syncs.Mutex
|
|
stopped chan struct{}
|
|
alreadyStopped bool
|
|
}
|
|
|
|
func (s *stopFlag) Stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.alreadyStopped {
|
|
return
|
|
}
|
|
s.alreadyStopped = true
|
|
if s.stopped == nil {
|
|
s.stopped = make(chan struct{})
|
|
}
|
|
close(s.stopped)
|
|
}
|
|
|
|
func (s *stopFlag) Done() <-chan struct{} {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.stopped == nil {
|
|
s.stopped = make(chan struct{})
|
|
}
|
|
return s.stopped
|
|
}
|
|
|
|
func (s *stopFlag) Wait() {
|
|
<-s.Done()
|
|
}
|