mirror of
https://github.com/tailscale/tailscale.git
synced 2026-05-05 20:26:47 +02:00
ipn/ipnlocal: serialize authReconfig calls via the event bus
Signed-off-by: Nick Khyl <nickk@tailscale.com>
This commit is contained in:
parent
ad6cf2f8f3
commit
0879cc5bb8
@ -343,7 +343,5 @@ func handleC2NSetNetfilterKind(b *LocalBackend, w http.ResponseWriter, r *http.R
|
||||
return
|
||||
}
|
||||
|
||||
b.authReconfig()
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
@ -188,6 +188,9 @@ type LocalBackend struct {
|
||||
sys *tsd.System
|
||||
eventSubs eventbus.Monitor
|
||||
|
||||
syncPub *eventbus.Publisher[syncPoint[LocalBackend]]
|
||||
authReconfigPub *eventbus.Publisher[authReconfigRequest]
|
||||
|
||||
health *health.Tracker // always non-nil
|
||||
polc policyclient.Client // always non-nil
|
||||
metrics metrics
|
||||
@ -498,6 +501,10 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
|
||||
needsCaptiveDetection: make(chan bool),
|
||||
}
|
||||
|
||||
ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
|
||||
b.syncPub = eventbus.Publish[syncPoint[LocalBackend]](ec)
|
||||
b.authReconfigPub = eventbus.Publish[authReconfigRequest](ec)
|
||||
|
||||
nb := newNodeBackend(ctx, b.logf, b.sys.Bus.Get())
|
||||
b.currentNodeAtomic.Store(nb)
|
||||
nb.ready()
|
||||
@ -571,7 +578,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
|
||||
|
||||
// Start the event bus late, once all the assignments above are done.
|
||||
// (See previous race in tailscale/tailscale#17252)
|
||||
ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
|
||||
b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec))
|
||||
|
||||
return b, nil
|
||||
@ -601,6 +607,9 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
|
||||
portlist = portlistSub.Events()
|
||||
}
|
||||
|
||||
syncSub := eventbus.Subscribe[syncPoint[LocalBackend]](ec)
|
||||
authReconfigSub := eventbus.Subscribe[authReconfigRequest](ec)
|
||||
|
||||
return func(ec *eventbus.Client) {
|
||||
for {
|
||||
select {
|
||||
@ -639,11 +648,52 @@ func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus
|
||||
b.logf("appc: failed to store route info: %v", err)
|
||||
}
|
||||
}
|
||||
case sp := <-syncSub.Events():
|
||||
sp.Signal()
|
||||
|
||||
case ar := <-authReconfigSub.Events():
|
||||
b.authReconfig(ar)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncPoint is an event published over an [eventbus.Bus].
|
||||
// It serves as a synchronization point, allowing to wait
|
||||
// until previously published events have been processed.
|
||||
//
|
||||
// T is used only to distinguish between different syncPoint types,
|
||||
// such as syncPoint[LocalBackend] vs syncPoint[nodeBackend].
|
||||
type syncPoint[T any] struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Wait blocks until [syncPoint.Signal] is called or the provided context is done.
|
||||
//
|
||||
// It must not be called with the subscriber's (e.g. LocalBackend's) lock held.
|
||||
func (s syncPoint[T]) Wait(ctx context.Context) error {
|
||||
select {
|
||||
case <-s.done:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// Signal signals the sync point, unblocking the [syncPoint.Wait] call.
|
||||
func (s syncPoint[T]) Signal() {
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
// synchronize waits for all [eventbus] events published
|
||||
// prior to this call to be processed by the receiver.
|
||||
// It must not be called with b.mu held.
|
||||
func (b *LocalBackend) synchronize() {
|
||||
sp := syncPoint[LocalBackend]{done: make(chan struct{})}
|
||||
b.syncPub.Publish(sp)
|
||||
sp.Wait(context.Background())
|
||||
}
|
||||
|
||||
func (b *LocalBackend) Clock() tstime.Clock { return b.clock }
|
||||
func (b *LocalBackend) Sys() *tsd.System { return b.sys }
|
||||
|
||||
@ -992,7 +1042,7 @@ func (b *LocalBackend) linkChange(delta *netmon.ChangeDelta) {
|
||||
// TODO(raggi,tailscale/corp#22574): authReconfig should be refactored such that we can call the
|
||||
// necessary operations here and avoid the need for asynchronous behavior that is racy and hard
|
||||
// to test here, and do less extra work in these conditions.
|
||||
b.goTracker.Go(b.authReconfig)
|
||||
b.requestAuthReconfigLocked()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1623,7 +1673,7 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
|
||||
// Auth completed, unblock the engine
|
||||
b.blockEngineUpdates(false)
|
||||
}
|
||||
b.authReconfig()
|
||||
b.requestAuthReconfigLocked()
|
||||
b.send(ipn.Notify{LoginFinished: &empty.Message{}})
|
||||
}
|
||||
|
||||
@ -1808,7 +1858,12 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
|
||||
b.stateMachine()
|
||||
// This is currently (2020-07-28) necessary; conditionally disabling it is fragile!
|
||||
// This is where netmap information gets propagated to router and magicsock.
|
||||
b.authReconfig()
|
||||
b.requestAuthReconfigLocked()
|
||||
|
||||
// We have tests that depend on having auth reconfig happen before
|
||||
// SetControlClientStatus returns, so for now, keep this synchronous.
|
||||
// TODO(nickkhyl): remove this once we fix the tests.
|
||||
b.synchronize()
|
||||
}
|
||||
|
||||
type preferencePolicyInfo struct {
|
||||
@ -4653,6 +4708,8 @@ func (b *LocalBackend) setPrefsLockedOnEntry(newp *ipn.Prefs, unlock unlockOnce)
|
||||
b.resetAlwaysOnOverrideLocked()
|
||||
}
|
||||
|
||||
authReconfig := b.stateForAuthReconfigLocked()
|
||||
|
||||
unlock.UnlockEarly()
|
||||
|
||||
if oldp.ShieldsUp() != newp.ShieldsUp || hostInfoChanged {
|
||||
@ -4671,7 +4728,7 @@ func (b *LocalBackend) setPrefsLockedOnEntry(newp *ipn.Prefs, unlock unlockOnce)
|
||||
if oldp.WantRunning() != newp.WantRunning {
|
||||
b.stateMachine()
|
||||
} else {
|
||||
b.authReconfig()
|
||||
b.authReconfigPub.Publish(authReconfig)
|
||||
}
|
||||
|
||||
b.send(ipn.Notify{Prefs: &prefs})
|
||||
@ -4980,24 +5037,47 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() {
|
||||
}
|
||||
}
|
||||
|
||||
type authReconfigRequest struct {
|
||||
node *nodeBackend
|
||||
prefs ipn.PrefsView
|
||||
prevIfState *netmon.State
|
||||
keyExpired bool
|
||||
blocked bool
|
||||
}
|
||||
|
||||
func (b *LocalBackend) stateForAuthReconfigLocked() authReconfigRequest {
|
||||
return authReconfigRequest{
|
||||
node: b.currentNode(),
|
||||
prefs: b.pm.CurrentPrefs(),
|
||||
prevIfState: b.prevIfState,
|
||||
keyExpired: b.keyExpired,
|
||||
blocked: b.blocked,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) requestAuthReconfigLocked() {
|
||||
b.authReconfigPub.Publish(b.stateForAuthReconfigLocked())
|
||||
}
|
||||
|
||||
// authReconfig pushes a new configuration into wgengine, if engine
|
||||
// updates are not currently blocked, based on the cached netmap and
|
||||
// user prefs.
|
||||
func (b *LocalBackend) authReconfig() {
|
||||
func (b *LocalBackend) authReconfig(r authReconfigRequest) {
|
||||
// Wait for magicsock to process pending [eventbus] events,
|
||||
// such as netmap updates. This should be completed before
|
||||
// wireguard-go is reconfigured. See tailscale/tailscale#16369.
|
||||
b.MagicConn().Synchronize()
|
||||
|
||||
b.mu.Lock()
|
||||
blocked := b.blocked
|
||||
prefs := b.pm.CurrentPrefs()
|
||||
cn := b.currentNode()
|
||||
blocked := r.blocked
|
||||
prefs := r.prefs
|
||||
cn := r.node
|
||||
nm := cn.NetMap()
|
||||
hasPAC := b.prevIfState.HasPAC()
|
||||
hasPAC := r.prevIfState.HasPAC()
|
||||
disableSubnetsIfPAC := cn.SelfHasCap(tailcfg.NodeAttrDisableSubnetsIfPAC)
|
||||
dohURL, dohURLOK := cn.exitNodeCanProxyDNS(prefs.ExitNodeID())
|
||||
dcfg := cn.dnsConfigForNetmap(prefs, b.keyExpired, version.OS())
|
||||
dcfg := cn.dnsConfigForNetmap(prefs, r.keyExpired, version.OS())
|
||||
|
||||
b.mu.Lock()
|
||||
// If the current node is an app connector, ensure the app connector machine is started
|
||||
b.reconfigAppConnectorLocked(nm, prefs)
|
||||
closing := b.shutdownCalled
|
||||
@ -5008,6 +5088,11 @@ func (b *LocalBackend) authReconfig() {
|
||||
return
|
||||
}
|
||||
|
||||
if cn != b.currentNode() {
|
||||
b.logf("[v1] authReconfig: skipping because node changed.")
|
||||
return
|
||||
}
|
||||
|
||||
if blocked {
|
||||
b.logf("[v1] authReconfig: blocked, skipping.")
|
||||
return
|
||||
@ -5593,6 +5678,8 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
|
||||
}
|
||||
b.pauseOrResumeControlClientLocked()
|
||||
|
||||
authReconfig := b.stateForAuthReconfigLocked()
|
||||
|
||||
unlock.UnlockEarly()
|
||||
|
||||
// prefs may change irrespective of state; WantRunning should be explicitly
|
||||
@ -5625,7 +5712,7 @@ func (b *LocalBackend) enterStateLockedOnEntry(newState ipn.State, unlock unlock
|
||||
feature.SystemdStatus("Stopped; run 'tailscale up' to log in")
|
||||
}
|
||||
case ipn.Starting, ipn.NeedsMachineAuth:
|
||||
b.authReconfig()
|
||||
b.authReconfigPub.Publish(authReconfig)
|
||||
// Needed so that UpdateEndpoints can run
|
||||
b.e.RequestStatus()
|
||||
case ipn.Running:
|
||||
@ -6038,24 +6125,6 @@ func (b *LocalBackend) RefreshExitNode() {
|
||||
if !buildfeatures.HasUseExitNode {
|
||||
return
|
||||
}
|
||||
if b.resolveExitNode() {
|
||||
b.authReconfig()
|
||||
}
|
||||
}
|
||||
|
||||
// resolveExitNode determines which exit node to use based on the current prefs
|
||||
// and netmap. It updates the exit node ID in the prefs if needed, updates the
|
||||
// exit node ID in the hostinfo if needed, sends a notification to clients, and
|
||||
// returns true if the exit node has changed.
|
||||
//
|
||||
// It is the caller's responsibility to reconfigure routes and actually
|
||||
// start using the selected exit node, if needed.
|
||||
//
|
||||
// b.mu must not be held.
|
||||
func (b *LocalBackend) resolveExitNode() (changed bool) {
|
||||
if !buildfeatures.HasUseExitNode {
|
||||
return false
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
@ -6087,7 +6156,7 @@ func (b *LocalBackend) resolveExitNode() (changed bool) {
|
||||
}
|
||||
|
||||
b.sendToLocked(ipn.Notify{Prefs: ptr.To(prefs.View())}, allClients)
|
||||
return true
|
||||
b.requestAuthReconfigLocked()
|
||||
}
|
||||
|
||||
// reconcilePrefsLocked applies policy overrides, exit node resolution,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user