mirror of
https://github.com/tailscale/tailscale.git
synced 2025-09-21 05:31:36 +02:00
Subscribers already have a Done channel that the caller can use to detect when the subscriber has been closed. Typically this happens when the governing Client closes, which in turn is typically because the Bus closed. But clients and subscribers can stop at other times too, and a caller has no good way to tell the difference between "this subscriber closed but the rest are OK" and "the client closed and all these subscribers are finished". We've worked around this in practice by knowing the closure of one subscriber implies the fate of the rest, but we can do better: Add a Done method to the Client that allows us to tell when that has been closed explicitly, after all the publishers and subscribers associated with that client have been closed. This allows the caller to be sure that, by the time that occurs, no further pending events are forthcoming on that client. Updates #15160 Change-Id: Id601a79ba043365ecdb47dd035f1fdadd984f303 Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
136 lines
3.0 KiB
Go
136 lines
3.0 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package eventbus
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
|
|
"tailscale.com/util/set"
|
|
)
|
|
|
|
// A Client can publish and subscribe to events on its attached
|
|
// bus. See [Publish] to publish events, and [Subscribe] to receive
|
|
// events.
|
|
//
|
|
// Subscribers that share the same client receive events one at a
|
|
// time, in the order they were published.
|
|
type Client struct {
|
|
name string
|
|
bus *Bus
|
|
publishDebug hook[PublishedEvent]
|
|
|
|
mu sync.Mutex
|
|
pub set.Set[publisher]
|
|
sub *subscribeState // Lazily created on first subscribe
|
|
stop stopFlag // signaled on Close
|
|
}
|
|
|
|
func (c *Client) Name() string { return c.name }
|
|
|
|
// Close closes the client. Implicitly closes all publishers and
|
|
// subscribers obtained from this client.
|
|
func (c *Client) Close() {
|
|
var (
|
|
pub set.Set[publisher]
|
|
sub *subscribeState
|
|
)
|
|
|
|
c.mu.Lock()
|
|
pub, c.pub = c.pub, nil
|
|
sub, c.sub = c.sub, nil
|
|
c.mu.Unlock()
|
|
|
|
if sub != nil {
|
|
sub.close()
|
|
}
|
|
for p := range pub {
|
|
p.Close()
|
|
}
|
|
c.stop.Stop()
|
|
}
|
|
|
|
// Done returns a channel that is closed when [Client.Close] is called.
|
|
// The channel is closed after all the publishers and subscribers governed by
|
|
// the client have been closed.
|
|
func (c *Client) Done() <-chan struct{} { return c.stop.Done() }
|
|
|
|
func (c *Client) snapshotSubscribeQueue() []DeliveredEvent {
|
|
return c.peekSubscribeState().snapshotQueue()
|
|
}
|
|
|
|
func (c *Client) peekSubscribeState() *subscribeState {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.sub
|
|
}
|
|
|
|
func (c *Client) publishTypes() []reflect.Type {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
ret := make([]reflect.Type, 0, len(c.pub))
|
|
for pub := range c.pub {
|
|
ret = append(ret, pub.publishType())
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func (c *Client) subscribeTypes() []reflect.Type {
|
|
return c.peekSubscribeState().subscribeTypes()
|
|
}
|
|
|
|
func (c *Client) subscribeState() *subscribeState {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.sub == nil {
|
|
c.sub = newSubscribeState(c)
|
|
}
|
|
return c.sub
|
|
}
|
|
|
|
func (c *Client) addPublisher(pub publisher) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.pub.Add(pub)
|
|
}
|
|
|
|
func (c *Client) deletePublisher(pub publisher) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.pub.Delete(pub)
|
|
}
|
|
|
|
func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) {
|
|
c.bus.subscribe(t, s)
|
|
}
|
|
|
|
func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) {
|
|
c.bus.unsubscribe(t, s)
|
|
}
|
|
|
|
func (c *Client) publish() chan<- PublishedEvent {
|
|
return c.bus.write
|
|
}
|
|
|
|
func (c *Client) shouldPublish(t reflect.Type) bool {
|
|
return c.publishDebug.active() || c.bus.shouldPublish(t)
|
|
}
|
|
|
|
// Subscribe requests delivery of events of type T through the given
|
|
// Queue. Panics if the queue already has a subscriber for T.
|
|
func Subscribe[T any](c *Client) *Subscriber[T] {
|
|
r := c.subscribeState()
|
|
s := newSubscriber[T](r)
|
|
r.addSubscriber(s)
|
|
return s
|
|
}
|
|
|
|
// Publish returns a publisher for event type T using the given
|
|
// client.
|
|
func Publish[T any](c *Client) *Publisher[T] {
|
|
p := newPublisher[T](c)
|
|
c.addPublisher(p)
|
|
return p
|
|
}
|