mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-25 06:11:01 +02:00 
			
		
		
		
	This lets debug tools list the types that clients are wielding, so that they can build a dataflow graph and other debugging views. Updates #15160 Signed-off-by: David Anderson <dave@tailscale.com>
		
			
				
	
	
		
			255 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			255 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) Tailscale Inc & AUTHORS
 | |
| // SPDX-License-Identifier: BSD-3-Clause
 | |
| 
 | |
| package eventbus
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| type DeliveredEvent struct {
 | |
| 	Event any
 | |
| 	From  *Client
 | |
| 	To    *Client
 | |
| }
 | |
| 
 | |
| // subscriber is a uniformly typed wrapper around Subscriber[T], so
 | |
| // that debugging facilities can look at active subscribers.
 | |
| type subscriber interface {
 | |
| 	subscribeType() reflect.Type
 | |
| 	// dispatch is a function that dispatches the head value in vals to
 | |
| 	// a subscriber, while also handling stop and incoming queue write
 | |
| 	// events.
 | |
| 	//
 | |
| 	// dispatch exists because of the strongly typed Subscriber[T]
 | |
| 	// wrapper around subscriptions: within the bus events are boxed in an
 | |
| 	// 'any', and need to be unpacked to their full type before delivery
 | |
| 	// to the subscriber. This involves writing to a strongly-typed
 | |
| 	// channel, so subscribeState cannot handle that dispatch by itself -
 | |
| 	// but if that strongly typed send blocks, we also need to keep
 | |
| 	// processing other potential sources of wakeups, which is how we end
 | |
| 	// up at this awkward type signature and sharing of internal state
 | |
| 	// through dispatch.
 | |
| 	dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool
 | |
| 	Close()
 | |
| }
 | |
| 
 | |
| // subscribeState handles dispatching of events received from a Bus.
 | |
| type subscribeState struct {
 | |
| 	client *Client
 | |
| 
 | |
| 	dispatcher *worker
 | |
| 	write      chan DeliveredEvent
 | |
| 	snapshot   chan chan []DeliveredEvent
 | |
| 	debug      hook[DeliveredEvent]
 | |
| 
 | |
| 	outputsMu sync.Mutex
 | |
| 	outputs   map[reflect.Type]subscriber
 | |
| }
 | |
| 
 | |
| func newSubscribeState(c *Client) *subscribeState {
 | |
| 	ret := &subscribeState{
 | |
| 		client:   c,
 | |
| 		write:    make(chan DeliveredEvent),
 | |
| 		snapshot: make(chan chan []DeliveredEvent),
 | |
| 		outputs:  map[reflect.Type]subscriber{},
 | |
| 	}
 | |
| 	ret.dispatcher = runWorker(ret.pump)
 | |
| 	return ret
 | |
| }
 | |
| 
 | |
| func (q *subscribeState) pump(ctx context.Context) {
 | |
| 	var vals queue[DeliveredEvent]
 | |
| 	acceptCh := func() chan DeliveredEvent {
 | |
| 		if vals.Full() {
 | |
| 			return nil
 | |
| 		}
 | |
| 		return q.write
 | |
| 	}
 | |
| 	for {
 | |
| 		if !vals.Empty() {
 | |
| 			val := vals.Peek()
 | |
| 			sub := q.subscriberFor(val.Event)
 | |
| 			if sub == nil {
 | |
| 				// Raced with unsubscribe.
 | |
| 				vals.Drop()
 | |
| 				continue
 | |
| 			}
 | |
| 			if !sub.dispatch(ctx, &vals, acceptCh, q.snapshot) {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			if q.debug.active() {
 | |
| 				q.debug.run(DeliveredEvent{
 | |
| 					Event: val.Event,
 | |
| 					From:  val.From,
 | |
| 					To:    q.client,
 | |
| 				})
 | |
| 			}
 | |
| 		} else {
 | |
| 			// Keep the cases in this select in sync with
 | |
| 			// Subscriber.dispatch below. The only different should be
 | |
| 			// that this select doesn't deliver queued values to
 | |
| 			// anyone, and unconditionally accepts new values.
 | |
| 			select {
 | |
| 			case val := <-q.write:
 | |
| 				vals.Add(val)
 | |
| 			case <-ctx.Done():
 | |
| 				return
 | |
| 			case ch := <-q.snapshot:
 | |
| 				ch <- vals.Snapshot()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *subscribeState) snapshotQueue() []DeliveredEvent {
 | |
| 	if s == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	resp := make(chan []DeliveredEvent)
 | |
| 	select {
 | |
| 	case s.snapshot <- resp:
 | |
| 		return <-resp
 | |
| 	case <-s.dispatcher.Done():
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *subscribeState) subscribeTypes() []reflect.Type {
 | |
| 	if s == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	s.outputsMu.Lock()
 | |
| 	defer s.outputsMu.Unlock()
 | |
| 	ret := make([]reflect.Type, 0, len(s.outputs))
 | |
| 	for t := range s.outputs {
 | |
| 		ret = append(ret, t)
 | |
| 	}
 | |
| 	return ret
 | |
| }
 | |
| 
 | |
| func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) {
 | |
| 	s.outputsMu.Lock()
 | |
| 	defer s.outputsMu.Unlock()
 | |
| 	if s.outputs[t] != nil {
 | |
| 		panic(fmt.Errorf("double subscription for event %s", t))
 | |
| 	}
 | |
| 	s.outputs[t] = sub
 | |
| 	s.client.addSubscriber(t, s)
 | |
| }
 | |
| 
 | |
| func (s *subscribeState) deleteSubscriber(t reflect.Type) {
 | |
| 	s.outputsMu.Lock()
 | |
| 	defer s.outputsMu.Unlock()
 | |
| 	delete(s.outputs, t)
 | |
| 	s.client.deleteSubscriber(t, s)
 | |
| }
 | |
| 
 | |
| func (q *subscribeState) subscriberFor(val any) subscriber {
 | |
| 	q.outputsMu.Lock()
 | |
| 	defer q.outputsMu.Unlock()
 | |
| 	return q.outputs[reflect.TypeOf(val)]
 | |
| }
 | |
| 
 | |
| // Close closes the subscribeState. Implicitly closes all Subscribers
 | |
| // linked to this state, and any pending events are discarded.
 | |
| func (s *subscribeState) close() {
 | |
| 	s.dispatcher.StopAndWait()
 | |
| 
 | |
| 	var subs map[reflect.Type]subscriber
 | |
| 	s.outputsMu.Lock()
 | |
| 	subs, s.outputs = s.outputs, nil
 | |
| 	s.outputsMu.Unlock()
 | |
| 	for _, sub := range subs {
 | |
| 		sub.Close()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *subscribeState) closed() <-chan struct{} {
 | |
| 	return s.dispatcher.Done()
 | |
| }
 | |
| 
 | |
| // A Subscriber delivers one type of event from a [Client].
 | |
| type Subscriber[T any] struct {
 | |
| 	stop       stopFlag
 | |
| 	read       chan T
 | |
| 	unregister func()
 | |
| }
 | |
| 
 | |
| func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
 | |
| 	t := reflect.TypeFor[T]()
 | |
| 
 | |
| 	ret := &Subscriber[T]{
 | |
| 		read:       make(chan T),
 | |
| 		unregister: func() { r.deleteSubscriber(t) },
 | |
| 	}
 | |
| 	r.addSubscriber(t, ret)
 | |
| 
 | |
| 	return ret
 | |
| }
 | |
| 
 | |
| func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
 | |
| 	ret := &Subscriber[T]{
 | |
| 		read: make(chan T, 100), // arbitrary, large
 | |
| 	}
 | |
| 	ret.unregister = attach(ret.monitor)
 | |
| 	return ret
 | |
| }
 | |
| 
 | |
| func (s *Subscriber[T]) subscribeType() reflect.Type {
 | |
| 	return reflect.TypeFor[T]()
 | |
| }
 | |
| 
 | |
| func (s *Subscriber[T]) monitor(debugEvent T) {
 | |
| 	select {
 | |
| 	case s.read <- debugEvent:
 | |
| 	case <-s.stop.Done():
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
 | |
| 	t := vals.Peek().Event.(T)
 | |
| 	for {
 | |
| 		// Keep the cases in this select in sync with subscribeState.pump
 | |
| 		// above. The only different should be that this select
 | |
| 		// delivers a value on s.read.
 | |
| 		select {
 | |
| 		case s.read <- t:
 | |
| 			vals.Drop()
 | |
| 			return true
 | |
| 		case val := <-acceptCh():
 | |
| 			vals.Add(val)
 | |
| 		case <-ctx.Done():
 | |
| 			return false
 | |
| 		case ch := <-snapshot:
 | |
| 			ch <- vals.Snapshot()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Events returns a channel on which the subscriber's events are
 | |
| // delivered.
 | |
| func (s *Subscriber[T]) Events() <-chan T {
 | |
| 	return s.read
 | |
| }
 | |
| 
 | |
| // Done returns a channel that is closed when the subscriber is
 | |
| // closed.
 | |
| func (s *Subscriber[T]) Done() <-chan struct{} {
 | |
| 	return s.stop.Done()
 | |
| }
 | |
| 
 | |
| // Close closes the Subscriber, indicating the caller no longer wishes
 | |
| // to receive this event type. After Close, receives on
 | |
| // [Subscriber.Events] block for ever.
 | |
| func (s *Subscriber[T]) Close() {
 | |
| 	s.stop.Stop() // unblock receivers
 | |
| 	s.unregister()
 | |
| }
 |