diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 5fb3d5771..19b7eed32 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -99,6 +99,7 @@ import ( "tailscale.com/util/clientmetric" "tailscale.com/util/deephash" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus" "tailscale.com/util/goroutines" "tailscale.com/util/httpm" "tailscale.com/util/mak" @@ -202,6 +203,8 @@ type LocalBackend struct { keyLogf logger.Logf // for printing list of peers on change statsLogf logger.Logf // for printing peers stats on change sys *tsd.System + eventbus *eventbus.Bus + eventClient *eventbus.Client health *health.Tracker // always non-nil metrics metrics e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys @@ -427,6 +430,8 @@ type LocalBackend struct { // // See tailscale/corp#29969. overrideExitNodePolicy bool + + magicSockConfChangeSub *eventbus.Subscriber[magicsock.ConfigurationChanged] } // HealthTracker returns the health tracker for the backend. @@ -532,7 +537,11 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running needsCaptiveDetection: make(chan bool), } - nb := newNodeBackend(ctx, b.sys.Bus.Get()) + b.eventbus = sys.Bus.Get() + b.eventClient = b.eventbus.Client("ipnlocal.LocalBackend") + b.magicSockConfChangeSub = eventbus.Subscribe[magicsock.ConfigurationChanged](b.eventClient) + + nb := newNodeBackend(ctx, b.eventbus) b.currentNodeAtomic.Store(nb) nb.ready() @@ -605,6 +614,21 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo return b, nil } +func (b *LocalBackend) consumeEventbusTopics() { + for { + select { + case <-b.ctx.Done(): + b.magicSockConfChangeSub.Close() + return + case <-b.magicSockConfChangeSub.Events(): + if b.ctx.Err() != nil { + return + } + go b.authReconfig() + } + } +} + func (b *LocalBackend) Clock() tstime.Clock { return b.clock } func (b *LocalBackend) Sys() *tsd.System { return b.sys } @@ -1756,9 +1780,6 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control b.setAuthURL(st.URL) } b.stateMachine() - // This is currently (2020-07-28) necessary; conditionally disabling it is fragile! - // This is where netmap information gets propagated to router and magicsock. - b.authReconfig() } type preferencePolicyInfo struct { @@ -2262,6 +2283,7 @@ func (b *LocalBackend) getNewControlClientFuncLocked() clientGen { // initOnce is called on the first call to [LocalBackend.Start]. func (b *LocalBackend) initOnce() { + go b.consumeEventbusTopics() b.extHost.Init() } @@ -4422,7 +4444,6 @@ func (b *LocalBackend) changeDisablesExitNodeLocked(prefs ipn.PrefsView, change // but wasn't empty before, then the change disables // exit node usage. return tmpPrefs.ExitNodeID == "" - } // adjustEditPrefsLocked applies additional changes to mp if necessary, @@ -5087,11 +5108,6 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() { // updates are not currently blocked, based on the cached netmap and // user prefs. func (b *LocalBackend) authReconfig() { - // Wait for magicsock to process pending [eventbus] events, - // such as netmap updates. This should be completed before - // wireguard-go is reconfigured. See tailscale/tailscale#16369. - b.MagicConn().Synchronize() - b.mu.Lock() blocked := b.blocked prefs := b.pm.CurrentPrefs() @@ -7337,7 +7353,7 @@ func (b *LocalBackend) resetForProfileChangeLockedOnEntry(unlock unlockOnce) err // down, so no need to do any work. return nil } - newNode := newNodeBackend(b.ctx, b.sys.Bus.Get()) + newNode := newNodeBackend(b.ctx, b.eventbus) if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil { oldNode.shutdown(errNodeContextChanged) } @@ -7676,10 +7692,8 @@ var ( // allowedAutoRoute determines if the route being added via AdvertiseRoute (the app connector featuge) should be allowed. func allowedAutoRoute(ipp netip.Prefix) bool { // Note: blocking the addrs for globals, not solely the prefixes. - for _, addr := range disallowedAddrs { - if ipp.Addr() == addr { - return false - } + if slices.Contains(disallowedAddrs, ipp.Addr()) { + return false } for _, pfx := range disallowedRanges { if pfx.Overlaps(ipp) { @@ -8113,7 +8127,6 @@ func isAllowedAutoExitNodeID(exitNodeID tailcfg.StableNodeID) bool { } if nodes, _ := syspolicy.GetStringArray(syspolicy.AllowedSuggestedExitNodes, nil); nodes != nil { return slices.Contains(nodes, string(exitNodeID)) - } return true // no policy configured; allow all exit nodes } @@ -8257,9 +8270,7 @@ func (b *LocalBackend) vipServicesFromPrefsLocked(prefs ipn.PrefsView) []*tailcf return servicesList } -var ( - metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus") -) +var metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus") func (b *LocalBackend) stateEncrypted() opt.Bool { switch runtime.GOOS { diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index a59a38f65..08bdb4620 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -14,6 +14,7 @@ import ( "expvar" "fmt" "io" + "maps" "net" "net/netip" "reflect" @@ -181,11 +182,11 @@ type Conn struct { filterSub *eventbus.Subscriber[FilterUpdate] nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate] nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate] - syncSub *eventbus.Subscriber[syncPoint] - syncPub *eventbus.Publisher[syncPoint] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] allocRelayEndpointSub *eventbus.Subscriber[UDPRelayAllocResp] + configChangedPub *eventbus.Publisher[ConfigurationChanged] subsDoneCh chan struct{} // closed when consumeEventbusTopics returns + netInfoPub *eventbus.Publisher[tailcfg.NetInfo] // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock @@ -423,6 +424,8 @@ type Conn struct { // metrics contains the metrics for the magicsock instance. metrics *metrics + + hasReconfigured chan any } // SetDebugLoggingEnabled controls whether spammy debug logging is enabled. @@ -562,20 +565,7 @@ type FilterUpdate struct { *filter.Filter } -// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize]. -// It serves as a synchronization point, allowing to wait until magicsock -// has processed all pending events. -type syncPoint chan struct{} - -// Wait blocks until [syncPoint.Signal] is called. -func (s syncPoint) Wait() { - <-s -} - -// Signal signals the sync point, unblocking the [syncPoint.Wait] call. -func (s syncPoint) Signal() { - close(s) -} +type ConfigurationChanged struct{} // UDPRelayAllocReq represents a [*disco.AllocateUDPRelayEndpointRequest] // reception event. This is signaled over an [eventbus.Bus] from @@ -612,15 +602,16 @@ type UDPRelayAllocResp struct { func newConn(logf logger.Logf) *Conn { discoPrivate := key.NewDisco() c := &Conn{ - logf: logf, - derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736 - derpStarted: make(chan struct{}), - peerLastDerp: make(map[key.NodePublic]int), - peerMap: newPeerMap(), - discoInfo: make(map[key.DiscoPublic]*discoInfo), - discoPrivate: discoPrivate, - discoPublic: discoPrivate.Public(), - cloudInfo: newCloudInfo(logf), + logf: logf, + derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736 + derpStarted: make(chan struct{}), + peerLastDerp: make(map[key.NodePublic]int), + peerMap: newPeerMap(), + discoInfo: make(map[key.DiscoPublic]*discoInfo), + discoPrivate: discoPrivate, + discoPublic: discoPrivate.Public(), + cloudInfo: newCloudInfo(logf), + hasReconfigured: make(chan any, 25), } c.discoShort = c.discoPublic.ShortString() c.bind = &connBind{Conn: c, closed: true} @@ -658,15 +649,22 @@ func (c *Conn) consumeEventbusTopics() { c.onPortMapChanged() case filterUpdate := <-c.filterSub.Events(): c.onFilterUpdate(filterUpdate) + c.hasReconfigured <- new(any) case nodeViews := <-c.nodeViewsSub.Events(): c.onNodeViewsUpdate(nodeViews) + c.hasReconfigured <- new(any) case nodeMuts := <-c.nodeMutsSub.Events(): c.onNodeMutationsUpdate(nodeMuts) - case syncPoint := <-c.syncSub.Events(): - c.dlogf("magicsock: received sync point after reconfig") - syncPoint.Signal() case allocResp := <-c.allocRelayEndpointSub.Events(): c.onUDPRelayAllocResp(allocResp) + c.hasReconfigured <- new(any) + case <-c.hasReconfigured: + c.dlogf("magicsock: configuration has changed") + // Drain channel as we only want to reconfigure once + for len(c.hasReconfigured) > 0 { + <-c.hasReconfigured + } + c.configChangedPub.Publish(ConfigurationChanged{}) } } } @@ -700,18 +698,6 @@ func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) { go c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, disco.key, allocResp.Message, discoVerboseLog) } -// Synchronize waits for all [eventbus] events published -// prior to this call to be processed by the receiver. -func (c *Conn) Synchronize() { - if c.syncPub == nil { - // Eventbus is not used; no need to synchronize (in certain tests). - return - } - sp := syncPoint(make(chan struct{})) - c.syncPub.Publish(sp) - sp.Wait() -} - // NewConn creates a magic Conn listening on opts.Port. // As the set of possible endpoints for a Conn changes, the // callback opts.EndpointsFunc is called. @@ -741,10 +727,10 @@ func NewConn(opts Options) (*Conn, error) { c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) - c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) - c.syncPub = eventbus.Publish[syncPoint](c.eventClient) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) + c.netInfoPub = eventbus.Publish[tailcfg.NetInfo](c.eventClient) + c.configChangedPub = eventbus.Publish[ConfigurationChanged](c.eventClient) c.subsDoneCh = make(chan struct{}) go c.consumeEventbusTopics() @@ -1123,12 +1109,21 @@ func (c *Conn) callNetInfoCallback(ni *tailcfg.NetInfo) { func (c *Conn) callNetInfoCallbackLocked(ni *tailcfg.NetInfo) { c.netInfoLast = ni + c.publishNetInfo(ni) if c.netInfoFunc != nil { c.dlogf("[v1] magicsock: netInfo update: %+v", ni) go c.netInfoFunc(ni) } } +func (c *Conn) publishNetInfo(ni *tailcfg.NetInfo) { + if c.netInfoPub != nil { + newNetInfo := *ni + newNetInfo.DERPLatency = maps.Clone(ni.DERPLatency) + c.netInfoPub.Publish(newNetInfo) + } +} + // addValidDiscoPathForTest makes addr a validated disco address for // discoKey. It's used in tests to enable receiving of packets from // addr without having to spin up the entire active discovery @@ -4085,9 +4080,11 @@ type lazyEndpoint struct { src epAddr } -var _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil) -var _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil) -var _ conn.Endpoint = (*lazyEndpoint)(nil) +var ( + _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil) + _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil) + _ conn.Endpoint = (*lazyEndpoint)(nil) +) // InitiationMessagePublicKey implements [conn.InitiationAwareEndpoint]. // wireguard-go calls us here if we passed it a [*lazyEndpoint] for an