tailscale/wgengine/magicsock/relaymanager.go
Brad Fitzpatrick 311dd3839d wgengine/magicsock: replace peers slice with peersByID map; add Upsert/RemovePeer
Replace Conn.peers (sorted views.Slice) with peersByID, a
map[tailcfg.NodeID]tailcfg.NodeView. The only caller that needed
the sorted slice (the disco message receive path's binary search)
becomes a single map lookup. Drop nodesEqual.

Add Conn.UpsertPeer / Conn.RemovePeer for O(1) single-peer endpoint
work. RemovePeer also performs a targeted single-disco-key cleanup
(previously that scan was O(discoInfo)).

Extract the shared per-peer upsert body as upsertPeerLocked; still
used by SetNetworkMap's bulk path. SetNetworkMap is documented as
the bulk / initial / self-change path; UpsertPeer and RemovePeer
are preferred for single-peer changes.

Make the relay server set update O(1) per peer: add serverUpsertCh
/ serverRemoveCh to relayManager with matching run-loop handlers.
UpsertPeer / RemovePeer evaluate the per-peer relay predicate
locally and dispatch upsert or remove. The full-rebuild
updateRelayServersSet stays for the initial netmap, filter
changes, and fallback.

Move the hasPeerRelayServers atomic from Conn onto relayManager,
next to the serversByNodeKey map it summarizes. The run loop is
now the single writer and needs no back-pointer to Conn;
endpoint's two hot-path readers take one extra hop to
de.c.relayManager.hasPeerRelayServers but the cost is the same
atomic load.

No callers use UpsertPeer/RemovePeer yet; a subsequent change will
plumb per-peer add/remove through the incremental map update path.

Updates #12542

Change-Id: If6a3442fe29ccbd77890ea61b754a4d1ad6ef225
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2026-04-22 15:07:11 -07:00

1138 lines
40 KiB
Go

// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package magicsock
import (
"context"
"errors"
"fmt"
"net/netip"
"sync"
"sync/atomic"
"time"
"tailscale.com/disco"
"tailscale.com/net/packet"
"tailscale.com/net/stun"
udprelay "tailscale.com/net/udprelay/endpoint"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
"tailscale.com/types/key"
"tailscale.com/util/set"
)
// relayManager manages allocation, handshaking, and initial probing (disco
// ping/pong) of [tailscale.com/net/udprelay.Server] endpoints. The zero value
// is ready for use.
//
// [relayManager] methods can be called by [Conn] and [endpoint] while their .mu
// mutexes are held. Therefore, in order to avoid deadlocks, [relayManager] must
// never attempt to acquire those mutexes synchronously from its runLoop(),
// including synchronous calls back towards [Conn] or [endpoint] methods that
// acquire them.
type relayManager struct {
initOnce sync.Once
// hasPeerRelayServers is whether relayManager is configured with at
// least one peer relay server via [relayManager.handleRelayServersSet]
// (or per-peer variants). Exposed as an atomic so [endpoint] hot paths
// can short-circuit when there are no relay servers without taking any
// lock or entering the run loop. Written only from runLoop() via
// [relayManager.publishHasServersRunLoop].
hasPeerRelayServers atomic.Bool
// ===================================================================
// The following fields are owned by a single goroutine, runLoop().
serversByNodeKey map[key.NodePublic]candidatePeerRelay
allocWorkByCandidatePeerRelayByEndpoint map[*endpoint]map[candidatePeerRelay]*relayEndpointAllocWork
allocWorkByDiscoKeysByServerNodeKey map[key.NodePublic]map[key.SortedPairOfDiscoPublic]*relayEndpointAllocWork
handshakeWorkByServerDiscoByEndpoint map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork
handshakeWorkByServerDiscoVNI map[serverDiscoVNI]*relayHandshakeWork
handshakeWorkAwaitingPong map[*relayHandshakeWork]addrPortVNI
addrPortVNIToHandshakeWork map[addrPortVNI]*relayHandshakeWork
handshakeGeneration uint32
allocGeneration uint32
// ===================================================================
// The following chan fields serve event inputs to a single goroutine,
// runLoop().
startDiscoveryCh chan endpointWithLastBest
allocateWorkDoneCh chan relayEndpointAllocWorkDoneEvent
handshakeWorkDoneCh chan relayEndpointHandshakeWorkDoneEvent
cancelWorkCh chan *endpoint
newServerEndpointCh chan newRelayServerEndpointEvent
rxDiscoMsgCh chan relayDiscoMsgEvent
serversCh chan set.Set[candidatePeerRelay]
serverUpsertCh chan candidatePeerRelay
serverRemoveCh chan key.NodePublic
getServersCh chan chan set.Set[candidatePeerRelay]
derpHomeChangeCh chan derpHomeChangeEvent
discoInfoMu syncs.Mutex // guards the following field
discoInfoByServerDisco map[key.DiscoPublic]*relayHandshakeDiscoInfo
// runLoopStoppedCh is written to by runLoop() upon return, enabling event
// writers to restart it when they are blocked (see
// relayManagerInputEvent()).
runLoopStoppedCh chan struct{}
}
// serverDiscoVNI represents a [tailscale.com/net/udprelay.Server] disco key
// and Geneve header VNI value for a given [udprelay.ServerEndpoint].
type serverDiscoVNI struct {
serverDisco key.DiscoPublic
vni uint32
}
// relayHandshakeWork serves to track in-progress relay handshake work for a
// [udprelay.ServerEndpoint]. This structure is immutable once initialized.
type relayHandshakeWork struct {
wlb endpointWithLastBest
se udprelay.ServerEndpoint
server candidatePeerRelay
handshakeGen uint32
// handshakeServerEndpoint() always writes to doneCh (len 1) when it
// returns. It may end up writing the same event afterward to
// relayManager.handshakeWorkDoneCh if runLoop() can receive it. runLoop()
// must select{} read on doneCh to prevent deadlock when attempting to write
// to rxDiscoMsgCh.
rxDiscoMsgCh chan relayDiscoMsgEvent
doneCh chan relayEndpointHandshakeWorkDoneEvent
ctx context.Context
cancel context.CancelFunc
}
func (r *relayHandshakeWork) dlogf(format string, args ...any) {
if !r.wlb.ep.c.debugLogging.Load() {
return
}
var relay string
if r.server.nodeKey.IsZero() {
relay = "from-call-me-maybe-via"
} else {
relay = r.server.nodeKey.ShortString()
}
r.wlb.ep.c.logf("%s node=%v relay=%v handshakeGen=%d disco[0]=%v disco[1]=%v",
fmt.Sprintf(format, args...),
r.wlb.ep.publicKey.ShortString(),
relay,
r.handshakeGen,
r.se.ClientDisco[0].ShortString(),
r.se.ClientDisco[1].ShortString(),
)
}
// newRelayServerEndpointEvent indicates a new [udprelay.ServerEndpoint] has
// become known either via allocation with a relay server, or via
// [disco.CallMeMaybeVia] reception. This structure is immutable once
// initialized.
type newRelayServerEndpointEvent struct {
wlb endpointWithLastBest
se udprelay.ServerEndpoint
server candidatePeerRelay // zero value if learned via [disco.CallMeMaybeVia]
}
// relayEndpointAllocWorkDoneEvent indicates relay server endpoint allocation
// work for an [*endpoint] has completed. This structure is immutable once
// initialized.
type relayEndpointAllocWorkDoneEvent struct {
work *relayEndpointAllocWork
allocated udprelay.ServerEndpoint // !allocated.ServerDisco.IsZero() if allocation succeeded
}
// relayEndpointHandshakeWorkDoneEvent indicates relay server endpoint handshake
// work for an [*endpoint] has completed. This structure is immutable once
// initialized.
type relayEndpointHandshakeWorkDoneEvent struct {
work *relayHandshakeWork
pongReceivedFrom netip.AddrPort // or zero value if handshake or ping/pong did not complete
latency time.Duration // only relevant if pongReceivedFrom.IsValid()
}
// hasActiveWorkRunLoop returns true if there is outstanding allocation or
// handshaking work for any endpoint, otherwise it returns false.
func (r *relayManager) hasActiveWorkRunLoop() bool {
return len(r.allocWorkByCandidatePeerRelayByEndpoint) > 0 || len(r.handshakeWorkByServerDiscoByEndpoint) > 0
}
// hasActiveWorkForEndpointRunLoop returns true if there is outstanding
// allocation or handshaking work for the provided endpoint, otherwise it
// returns false.
func (r *relayManager) hasActiveWorkForEndpointRunLoop(ep *endpoint) bool {
_, handshakeWork := r.handshakeWorkByServerDiscoByEndpoint[ep]
_, allocWork := r.allocWorkByCandidatePeerRelayByEndpoint[ep]
return handshakeWork || allocWork
}
// derpHomeChangeEvent represents a change in the DERP home region for the
// node identified by nodeKey. This structure is immutable once initialized.
type derpHomeChangeEvent struct {
nodeKey key.NodePublic
regionID uint16
}
// handleDERPHomeChange handles a DERP home change event for nodeKey and
// regionID.
func (r *relayManager) handleDERPHomeChange(nodeKey key.NodePublic, regionID uint16) {
relayManagerInputEvent(r, nil, &r.derpHomeChangeCh, derpHomeChangeEvent{
nodeKey: nodeKey,
regionID: regionID,
})
}
func (r *relayManager) handleDERPHomeChangeRunLoop(event derpHomeChangeEvent) {
c, ok := r.serversByNodeKey[event.nodeKey]
if ok {
c.derpHomeRegionID = event.regionID
r.serversByNodeKey[event.nodeKey] = c
}
}
// runLoop is a form of event loop. It ensures exclusive access to most of
// [relayManager] state.
func (r *relayManager) runLoop() {
defer func() {
r.runLoopStoppedCh <- struct{}{}
}()
for {
select {
case startDiscovery := <-r.startDiscoveryCh:
if !r.hasActiveWorkForEndpointRunLoop(startDiscovery.ep) {
r.allocateAllServersRunLoop(startDiscovery)
}
if !r.hasActiveWorkRunLoop() {
return
}
case done := <-r.allocateWorkDoneCh:
r.handleAllocWorkDoneRunLoop(done)
if !r.hasActiveWorkRunLoop() {
return
}
case ep := <-r.cancelWorkCh:
r.stopWorkRunLoop(ep)
if !r.hasActiveWorkRunLoop() {
return
}
case newServerEndpoint := <-r.newServerEndpointCh:
r.handleNewServerEndpointRunLoop(newServerEndpoint)
if !r.hasActiveWorkRunLoop() {
return
}
case done := <-r.handshakeWorkDoneCh:
r.handleHandshakeWorkDoneRunLoop(done)
if !r.hasActiveWorkRunLoop() {
return
}
case discoMsgEvent := <-r.rxDiscoMsgCh:
r.handleRxDiscoMsgRunLoop(discoMsgEvent)
if !r.hasActiveWorkRunLoop() {
return
}
case serversUpdate := <-r.serversCh:
r.handleServersUpdateRunLoop(serversUpdate)
if !r.hasActiveWorkRunLoop() {
return
}
case upsert := <-r.serverUpsertCh:
r.handleServerUpsertRunLoop(upsert)
if !r.hasActiveWorkRunLoop() {
return
}
case nk := <-r.serverRemoveCh:
r.handleServerRemoveRunLoop(nk)
if !r.hasActiveWorkRunLoop() {
return
}
case getServersCh := <-r.getServersCh:
r.handleGetServersRunLoop(getServersCh)
if !r.hasActiveWorkRunLoop() {
return
}
case derpHomeChange := <-r.derpHomeChangeCh:
r.handleDERPHomeChangeRunLoop(derpHomeChange)
if !r.hasActiveWorkRunLoop() {
return
}
}
}
}
func (r *relayManager) handleGetServersRunLoop(getServersCh chan set.Set[candidatePeerRelay]) {
servers := make(set.Set[candidatePeerRelay], len(r.serversByNodeKey))
for _, v := range r.serversByNodeKey {
servers.Add(v)
}
getServersCh <- servers
}
func (r *relayManager) getServers() set.Set[candidatePeerRelay] {
ch := make(chan set.Set[candidatePeerRelay])
relayManagerInputEvent(r, nil, &r.getServersCh, ch)
return <-ch
}
func (r *relayManager) handleServersUpdateRunLoop(update set.Set[candidatePeerRelay]) {
for _, v := range r.serversByNodeKey {
if !update.Contains(v) {
delete(r.serversByNodeKey, v.nodeKey)
}
}
for _, v := range update.Slice() {
r.serversByNodeKey[v.nodeKey] = v
}
r.publishHasServersRunLoop()
}
// handleServerUpsertRunLoop inserts or updates cp in serversByNodeKey. It is
// the per-peer analog of [relayManager.handleServersUpdateRunLoop] used by
// [Conn.UpsertPeer].
func (r *relayManager) handleServerUpsertRunLoop(cp candidatePeerRelay) {
r.serversByNodeKey[cp.nodeKey] = cp
r.publishHasServersRunLoop()
}
// handleServerRemoveRunLoop deletes nk from serversByNodeKey. It is a no-op
// if nk isn't currently a known server. It is the per-peer analog of
// [relayManager.handleServersUpdateRunLoop] used by [Conn.RemovePeer] and by
// [Conn.UpsertPeer] when a peer is upserted with fields that make it no
// longer a relay candidate.
func (r *relayManager) handleServerRemoveRunLoop(nk key.NodePublic) {
if _, ok := r.serversByNodeKey[nk]; !ok {
return
}
delete(r.serversByNodeKey, nk)
r.publishHasServersRunLoop()
}
// publishHasServersRunLoop updates [relayManager.hasPeerRelayServers] to
// reflect whether any relay servers are currently known.
func (r *relayManager) publishHasServersRunLoop() {
r.hasPeerRelayServers.Store(len(r.serversByNodeKey) > 0)
}
type relayDiscoMsgEvent struct {
conn *Conn // for access to [Conn] if there is no associated [relayHandshakeWork]
msg disco.Message
relayServerNodeKey key.NodePublic // nonzero if msg is a [*disco.AllocateUDPRelayEndpointResponse]
disco key.DiscoPublic
from netip.AddrPort
vni uint32
at time.Time
}
// relayEndpointAllocWork serves to track in-progress relay endpoint allocation
// for an [*endpoint]. This structure is immutable once initialized.
type relayEndpointAllocWork struct {
wlb endpointWithLastBest
discoKeys key.SortedPairOfDiscoPublic
candidatePeerRelay candidatePeerRelay // zero value if learned via [disco.CallMeMaybeVia]
allocGen uint32
// allocateServerEndpoint() always writes to doneCh (len 1) when it
// returns. It may end up writing the same event afterward to
// [relayManager.allocateWorkDoneCh] if runLoop() can receive it. runLoop()
// must select{} read on doneCh to prevent deadlock when attempting to write
// to rxDiscoMsgCh.
rxDiscoMsgCh chan *disco.AllocateUDPRelayEndpointResponse
doneCh chan relayEndpointAllocWorkDoneEvent
ctx context.Context
cancel context.CancelFunc
}
func (r *relayEndpointAllocWork) dlogf(format string, args ...any) {
if !r.wlb.ep.c.debugLogging.Load() {
return
}
r.wlb.ep.c.logf("%s node=%v relay=%v allocGen=%d disco[0]=%v disco[1]=%v",
fmt.Sprintf(format, args...),
r.wlb.ep.publicKey.ShortString(),
r.candidatePeerRelay.nodeKey.ShortString(),
r.allocGen,
r.discoKeys.Get()[0].ShortString(),
r.discoKeys.Get()[1].ShortString(),
)
}
// init initializes [relayManager] if it is not already initialized.
func (r *relayManager) init() {
r.initOnce.Do(func() {
r.discoInfoByServerDisco = make(map[key.DiscoPublic]*relayHandshakeDiscoInfo)
r.serversByNodeKey = make(map[key.NodePublic]candidatePeerRelay)
r.allocWorkByCandidatePeerRelayByEndpoint = make(map[*endpoint]map[candidatePeerRelay]*relayEndpointAllocWork)
r.allocWorkByDiscoKeysByServerNodeKey = make(map[key.NodePublic]map[key.SortedPairOfDiscoPublic]*relayEndpointAllocWork)
r.handshakeWorkByServerDiscoByEndpoint = make(map[*endpoint]map[key.DiscoPublic]*relayHandshakeWork)
r.handshakeWorkByServerDiscoVNI = make(map[serverDiscoVNI]*relayHandshakeWork)
r.handshakeWorkAwaitingPong = make(map[*relayHandshakeWork]addrPortVNI)
r.addrPortVNIToHandshakeWork = make(map[addrPortVNI]*relayHandshakeWork)
r.startDiscoveryCh = make(chan endpointWithLastBest)
r.allocateWorkDoneCh = make(chan relayEndpointAllocWorkDoneEvent)
r.handshakeWorkDoneCh = make(chan relayEndpointHandshakeWorkDoneEvent)
r.cancelWorkCh = make(chan *endpoint)
r.newServerEndpointCh = make(chan newRelayServerEndpointEvent)
r.rxDiscoMsgCh = make(chan relayDiscoMsgEvent)
r.serversCh = make(chan set.Set[candidatePeerRelay])
r.serverUpsertCh = make(chan candidatePeerRelay)
r.serverRemoveCh = make(chan key.NodePublic)
r.getServersCh = make(chan chan set.Set[candidatePeerRelay])
r.derpHomeChangeCh = make(chan derpHomeChangeEvent)
r.runLoopStoppedCh = make(chan struct{}, 1)
r.runLoopStoppedCh <- struct{}{}
})
}
// relayHandshakeDiscoInfo serves to cache a [*discoInfo] for outstanding
// [*relayHandshakeWork] against a given relay server.
type relayHandshakeDiscoInfo struct {
work set.Set[*relayHandshakeWork] // guarded by relayManager.discoInfoMu
di *discoInfo // immutable once initialized
}
// ensureDiscoInfoFor ensures a [*discoInfo] will be returned by discoInfo() for
// the server disco key associated with 'work'. Callers must also call
// derefDiscoInfoFor() when 'work' is complete.
func (r *relayManager) ensureDiscoInfoFor(work *relayHandshakeWork) {
r.discoInfoMu.Lock()
defer r.discoInfoMu.Unlock()
di, ok := r.discoInfoByServerDisco[work.se.ServerDisco]
if !ok {
di = &relayHandshakeDiscoInfo{}
di.work.Make()
r.discoInfoByServerDisco[work.se.ServerDisco] = di
}
di.work.Add(work)
if di.di == nil {
di.di = &discoInfo{
discoKey: work.se.ServerDisco,
discoShort: work.se.ServerDisco.ShortString(),
sharedKey: work.wlb.ep.c.discoAtomic.Private().Shared(work.se.ServerDisco),
}
}
}
// derefDiscoInfoFor decrements the reference count of the [*discoInfo]
// associated with 'work'.
func (r *relayManager) derefDiscoInfoFor(work *relayHandshakeWork) {
r.discoInfoMu.Lock()
defer r.discoInfoMu.Unlock()
di, ok := r.discoInfoByServerDisco[work.se.ServerDisco]
if !ok {
// TODO(jwhited): unexpected
return
}
di.work.Delete(work)
if di.work.Len() == 0 {
delete(r.discoInfoByServerDisco, work.se.ServerDisco)
}
}
// discoInfo returns a [*discoInfo] for 'serverDisco' if there is an
// active/ongoing handshake with it, otherwise it returns nil, false.
func (r *relayManager) discoInfo(serverDisco key.DiscoPublic) (_ *discoInfo, ok bool) {
r.discoInfoMu.Lock()
defer r.discoInfoMu.Unlock()
di, ok := r.discoInfoByServerDisco[serverDisco]
if ok {
return di.di, ok
}
return nil, false
}
func (r *relayManager) handleCallMeMaybeVia(ep *endpoint, lastBest addrQuality, lastBestIsTrusted bool, dm *disco.CallMeMaybeVia) {
se := udprelay.ServerEndpoint{
ServerDisco: dm.ServerDisco,
ClientDisco: dm.ClientDisco,
LamportID: dm.LamportID,
AddrPorts: dm.AddrPorts,
VNI: dm.VNI,
}
se.BindLifetime.Duration = dm.BindLifetime
se.SteadyStateLifetime.Duration = dm.SteadyStateLifetime
relayManagerInputEvent(r, nil, &r.newServerEndpointCh, newRelayServerEndpointEvent{
wlb: endpointWithLastBest{
ep: ep,
lastBest: lastBest,
lastBestIsTrusted: lastBestIsTrusted,
},
se: se,
})
}
// handleRxDiscoMsg handles reception of disco messages that [relayManager]
// may be interested in. This includes all Geneve-encapsulated disco messages
// and [*disco.AllocateUDPRelayEndpointResponse]. If dm is a
// [*disco.AllocateUDPRelayEndpointResponse] then relayServerNodeKey must be
// nonzero.
func (r *relayManager) handleRxDiscoMsg(conn *Conn, dm disco.Message, relayServerNodeKey key.NodePublic, discoKey key.DiscoPublic, src epAddr) {
relayManagerInputEvent(r, nil, &r.rxDiscoMsgCh, relayDiscoMsgEvent{
conn: conn,
msg: dm,
relayServerNodeKey: relayServerNodeKey,
disco: discoKey,
from: src.ap,
vni: src.vni.Get(),
at: time.Now(),
})
}
// handleRelayServersSet handles an update of the complete relay server set.
func (r *relayManager) handleRelayServersSet(servers set.Set[candidatePeerRelay]) {
relayManagerInputEvent(r, nil, &r.serversCh, servers)
}
// handleRelayServerUpsert is the O(1) per-peer variant of
// [relayManager.handleRelayServersSet]: it inserts or updates a single
// relay server entry.
func (r *relayManager) handleRelayServerUpsert(cp candidatePeerRelay) {
relayManagerInputEvent(r, nil, &r.serverUpsertCh, cp)
}
// handleRelayServerRemove is the O(1) per-peer variant of
// [relayManager.handleRelayServersSet]: it removes a single relay server
// entry by node key. It is a no-op if nk is not currently a known relay
// server.
func (r *relayManager) handleRelayServerRemove(nk key.NodePublic) {
relayManagerInputEvent(r, nil, &r.serverRemoveCh, nk)
}
// relayManagerInputEvent initializes [relayManager] if necessary, starts
// relayManager.runLoop() if it is not running, and writes 'event' on 'eventCh'.
//
// [relayManager] initialization will make `*eventCh`, so it must be passed as
// a pointer to a channel.
//
// 'ctx' can be used for returning when runLoop is waiting for the calling
// goroutine to return, i.e. the calling goroutine was birthed by runLoop and is
// cancelable via 'ctx'. 'ctx' may be nil.
func relayManagerInputEvent[T any](r *relayManager, ctx context.Context, eventCh *chan T, event T) {
r.init()
var ctxDoneCh <-chan struct{}
if ctx != nil {
ctxDoneCh = ctx.Done()
}
for {
select {
case <-ctxDoneCh:
return
case *eventCh <- event:
return
case <-r.runLoopStoppedCh:
go r.runLoop()
}
}
}
// endpointWithLastBest represents an [*endpoint], its last bestAddr, and if
// the last bestAddr was trusted (see endpoint.trustBestAddrUntil) at the time
// of init. This structure is immutable once initialized.
type endpointWithLastBest struct {
ep *endpoint
lastBest addrQuality
lastBestIsTrusted bool
}
// startUDPRelayPathDiscoveryFor starts UDP relay path discovery for ep on all
// known relay servers if ep has no in-progress work.
func (r *relayManager) startUDPRelayPathDiscoveryFor(ep *endpoint, lastBest addrQuality, lastBestIsTrusted bool) {
relayManagerInputEvent(r, nil, &r.startDiscoveryCh, endpointWithLastBest{
ep: ep,
lastBest: lastBest,
lastBestIsTrusted: lastBestIsTrusted,
})
}
// stopWork stops all outstanding allocation & handshaking work for 'ep'.
func (r *relayManager) stopWork(ep *endpoint) {
relayManagerInputEvent(r, nil, &r.cancelWorkCh, ep)
}
// stopWorkRunLoop cancels & clears outstanding allocation and handshaking
// work for 'ep'.
func (r *relayManager) stopWorkRunLoop(ep *endpoint) {
byDiscoKeys, ok := r.allocWorkByCandidatePeerRelayByEndpoint[ep]
if ok {
for _, work := range byDiscoKeys {
work.cancel()
done := <-work.doneCh
r.handleAllocWorkDoneRunLoop(done)
}
}
byServerDisco, ok := r.handshakeWorkByServerDiscoByEndpoint[ep]
if ok {
for _, handshakeWork := range byServerDisco {
handshakeWork.cancel()
done := <-handshakeWork.doneCh
r.handleHandshakeWorkDoneRunLoop(done)
}
}
}
// addrPortVNI represents a combined netip.AddrPort and Geneve header virtual
// network identifier.
type addrPortVNI struct {
addrPort netip.AddrPort
vni uint32
}
func (r *relayManager) handleRxDiscoMsgRunLoop(event relayDiscoMsgEvent) {
var (
work *relayHandshakeWork
ok bool
)
apv := addrPortVNI{event.from, event.vni}
switch msg := event.msg.(type) {
case *disco.AllocateUDPRelayEndpointResponse:
sorted := key.NewSortedPairOfDiscoPublic(msg.ClientDisco[0], msg.ClientDisco[1])
byDiscoKeys, ok := r.allocWorkByDiscoKeysByServerNodeKey[event.relayServerNodeKey]
if !ok {
// No outstanding work tied to this relay sever, discard.
return
}
allocWork, ok := byDiscoKeys[sorted]
if !ok {
// No outstanding work tied to these disco keys, discard.
return
}
select {
case done := <-allocWork.doneCh:
// allocateServerEndpoint returned, clean up its state
r.handleAllocWorkDoneRunLoop(done)
return
case allocWork.rxDiscoMsgCh <- msg:
return
}
case *disco.BindUDPRelayEndpointChallenge:
work, ok = r.handshakeWorkByServerDiscoVNI[serverDiscoVNI{event.disco, event.vni}]
if !ok {
// No outstanding work tied to this challenge, discard.
return
}
_, ok = r.handshakeWorkAwaitingPong[work]
if ok {
// We've seen a challenge for this relay endpoint previously,
// discard. Servers only respond to the first src ip:port they see
// binds from.
return
}
_, ok = r.addrPortVNIToHandshakeWork[apv]
if ok {
// There is existing work for the same [addrPortVNI] that is not
// 'work'. If both instances happen to be on the same server we
// could attempt to resolve event order using LamportID. For now
// just leave both work instances alone and take no action other
// than to discard this challenge msg.
return
}
// Update state so that future ping/pong will route to 'work'.
r.handshakeWorkAwaitingPong[work] = apv
r.addrPortVNIToHandshakeWork[apv] = work
case *disco.Ping:
// Always TX a pong. We might not have any associated work if ping
// reception raced with our call to [endpoint.udpRelayEndpointReady()], so
// err on the side of enabling the remote side to use this path.
//
// Conn.handlePingLocked() makes efforts to suppress duplicate pongs
// where the same ping can be received both via raw socket and UDP
// socket on Linux. We make no such efforts here as the raw socket BPF
// program does not support Geneve-encapsulated disco, and is also
// disabled by default.
vni := packet.VirtualNetworkID{}
vni.Set(event.vni)
go event.conn.sendDiscoMessage(epAddr{ap: event.from, vni: vni}, key.NodePublic{}, event.disco, &disco.Pong{
TxID: msg.TxID,
Src: event.from,
}, discoVerboseLog)
work, ok = r.addrPortVNIToHandshakeWork[apv]
if !ok {
// No outstanding work tied to this [addrPortVNI], return early.
return
}
case *disco.Pong:
work, ok = r.addrPortVNIToHandshakeWork[apv]
if !ok {
// No outstanding work tied to this [addrPortVNI], discard.
return
}
default:
// Unexpected message type, discard.
return
}
select {
case done := <-work.doneCh:
// handshakeServerEndpoint() returned, clean up its state.
r.handleHandshakeWorkDoneRunLoop(done)
return
case work.rxDiscoMsgCh <- event:
return
}
}
func (r *relayManager) handleAllocWorkDoneRunLoop(done relayEndpointAllocWorkDoneEvent) {
byCandidatePeerRelay, ok := r.allocWorkByCandidatePeerRelayByEndpoint[done.work.wlb.ep]
if !ok {
return
}
work, ok := byCandidatePeerRelay[done.work.candidatePeerRelay]
if !ok || work != done.work {
return
}
delete(byCandidatePeerRelay, done.work.candidatePeerRelay)
if len(byCandidatePeerRelay) == 0 {
delete(r.allocWorkByCandidatePeerRelayByEndpoint, done.work.wlb.ep)
}
byDiscoKeys, ok := r.allocWorkByDiscoKeysByServerNodeKey[done.work.candidatePeerRelay.nodeKey]
if !ok {
// unexpected
return
}
delete(byDiscoKeys, done.work.discoKeys)
if len(byDiscoKeys) == 0 {
delete(r.allocWorkByDiscoKeysByServerNodeKey, done.work.candidatePeerRelay.nodeKey)
}
if !done.allocated.ServerDisco.IsZero() {
r.handleNewServerEndpointRunLoop(newRelayServerEndpointEvent{
wlb: done.work.wlb,
se: done.allocated,
server: done.work.candidatePeerRelay,
})
}
}
func (r *relayManager) handleHandshakeWorkDoneRunLoop(done relayEndpointHandshakeWorkDoneEvent) {
byServerDisco, ok := r.handshakeWorkByServerDiscoByEndpoint[done.work.wlb.ep]
if !ok {
return
}
work, ok := byServerDisco[done.work.se.ServerDisco]
if !ok || work != done.work {
return
}
delete(byServerDisco, done.work.se.ServerDisco)
if len(byServerDisco) == 0 {
delete(r.handshakeWorkByServerDiscoByEndpoint, done.work.wlb.ep)
}
delete(r.handshakeWorkByServerDiscoVNI, serverDiscoVNI{done.work.se.ServerDisco, done.work.se.VNI})
apv, ok := r.handshakeWorkAwaitingPong[work]
if ok {
delete(r.handshakeWorkAwaitingPong, work)
delete(r.addrPortVNIToHandshakeWork, apv)
}
if !done.pongReceivedFrom.IsValid() {
// The handshake or ping/pong probing timed out.
return
}
// This relay endpoint is functional.
vni := packet.VirtualNetworkID{}
vni.Set(done.work.se.VNI)
addr := epAddr{ap: done.pongReceivedFrom, vni: vni}
// ep.udpRelayEndpointReady() must be called in a new goroutine to prevent
// deadlocks as it acquires [endpoint] & [Conn] mutexes. See [relayManager]
// docs for details.
go done.work.wlb.ep.udpRelayEndpointReady(addrQuality{
epAddr: addr,
relayServerDisco: done.work.se.ServerDisco,
latency: done.latency,
wireMTU: pingSizeToPktLen(0, addr),
})
}
func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelayServerEndpointEvent) {
// Check for duplicate work by server disco + VNI.
sdv := serverDiscoVNI{newServerEndpoint.se.ServerDisco, newServerEndpoint.se.VNI}
existingWork, ok := r.handshakeWorkByServerDiscoVNI[sdv]
if ok {
// There's in-progress handshake work for the server disco + VNI, which
// uniquely identify a [udprelay.ServerEndpoint]. Compare Lamport
// IDs to determine which is newer.
if existingWork.se.LamportID >= newServerEndpoint.se.LamportID {
// The existing work is a duplicate or newer. Return early.
return
}
// The existing work is no longer valid, clean it up.
existingWork.cancel()
done := <-existingWork.doneCh
r.handleHandshakeWorkDoneRunLoop(done)
}
// Check for duplicate work by [*endpoint] + server disco.
byServerDisco, ok := r.handshakeWorkByServerDiscoByEndpoint[newServerEndpoint.wlb.ep]
if ok {
existingWork, ok := byServerDisco[newServerEndpoint.se.ServerDisco]
if ok {
if newServerEndpoint.se.LamportID <= existingWork.se.LamportID {
// The "new" server endpoint is outdated or duplicate in
// consideration against existing handshake work. Return early.
return
}
// Cancel existing handshake that has a lower lamport ID.
existingWork.cancel()
done := <-existingWork.doneCh
r.handleHandshakeWorkDoneRunLoop(done)
}
}
// We're now reasonably sure we're dealing with the latest
// [udprelay.ServerEndpoint] from a server event order perspective
// (LamportID).
if newServerEndpoint.server.isValid() {
// Send a [disco.CallMeMaybeVia] to the remote peer if we allocated this
// endpoint, regardless of if we start a handshake below.
go r.sendCallMeMaybeVia(newServerEndpoint.wlb.ep, newServerEndpoint.se)
}
lastBestMatchingServer := newServerEndpoint.se.ServerDisco.Compare(newServerEndpoint.wlb.lastBest.relayServerDisco) == 0
if lastBestMatchingServer && newServerEndpoint.wlb.lastBestIsTrusted {
// This relay server endpoint is the same as [endpoint]'s bestAddr at
// the time UDP relay path discovery was started, and it was also a
// trusted path (see endpoint.trustBestAddrUntil), so return early.
//
// If we were to start a new handshake, there is a chance that we
// cause [endpoint] to blackhole some packets on its bestAddr if we end
// up shifting to a new address family or src, e.g. IPv4 to IPv6, due to
// the window of time between the handshake completing, and our call to
// udpRelayEndpointReady(). The relay server can only forward packets
// from us on a single [epAddr].
return
}
// TODO(jwhited): if lastBest is untrusted, consider some strategies
// to reduce the chance we blackhole if it were to transition to
// trusted during/before the new handshake:
// 1. Start by attempting a handshake with only lastBest.epAddr. If
// that fails then try the remaining [epAddr]s.
// 2. Signal bestAddr trust transitions between [endpoint] and
// [relayManager] in order to prevent a handshake from starting
// and/or stop one that is running.
// We're ready to start a new handshake.
ctx, cancel := context.WithCancel(context.Background())
work := &relayHandshakeWork{
wlb: newServerEndpoint.wlb,
se: newServerEndpoint.se,
server: newServerEndpoint.server,
rxDiscoMsgCh: make(chan relayDiscoMsgEvent),
doneCh: make(chan relayEndpointHandshakeWorkDoneEvent, 1),
ctx: ctx,
cancel: cancel,
}
// We must look up byServerDisco again. The previous value may have been
// deleted from the outer map when cleaning up duplicate work.
byServerDisco, ok = r.handshakeWorkByServerDiscoByEndpoint[newServerEndpoint.wlb.ep]
if !ok {
byServerDisco = make(map[key.DiscoPublic]*relayHandshakeWork)
r.handshakeWorkByServerDiscoByEndpoint[newServerEndpoint.wlb.ep] = byServerDisco
}
byServerDisco[newServerEndpoint.se.ServerDisco] = work
r.handshakeWorkByServerDiscoVNI[sdv] = work
r.handshakeGeneration++
if r.handshakeGeneration == 0 { // generation must be nonzero
r.handshakeGeneration++
}
work.handshakeGen = r.handshakeGeneration
go r.handshakeServerEndpoint(work)
}
// sendCallMeMaybeVia sends a [disco.CallMeMaybeVia] to ep over DERP. It must be
// called as part of a goroutine independent from runLoop(), for 2 reasons:
// 1. it acquires ep.mu (refer to [relayManager] docs for reasoning)
// 2. it makes a networking syscall, which can introduce unwanted backpressure
func (r *relayManager) sendCallMeMaybeVia(ep *endpoint, se udprelay.ServerEndpoint) {
ep.mu.Lock()
derpAddr := ep.derpAddr
ep.mu.Unlock()
epDisco := ep.disco.Load()
if epDisco == nil || !derpAddr.IsValid() {
return
}
callMeMaybeVia := &disco.CallMeMaybeVia{
UDPRelayEndpoint: disco.UDPRelayEndpoint{
ServerDisco: se.ServerDisco,
ClientDisco: se.ClientDisco,
LamportID: se.LamportID,
VNI: se.VNI,
BindLifetime: se.BindLifetime.Duration,
SteadyStateLifetime: se.SteadyStateLifetime.Duration,
AddrPorts: se.AddrPorts,
},
}
ep.c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, epDisco.key, callMeMaybeVia, discoVerboseLog)
}
func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
done := relayEndpointHandshakeWorkDoneEvent{work: work}
r.ensureDiscoInfoFor(work)
defer func() {
r.derefDiscoInfoFor(work)
work.doneCh <- done
relayManagerInputEvent(r, work.ctx, &r.handshakeWorkDoneCh, done)
work.cancel()
}()
ep := work.wlb.ep
epDisco := ep.disco.Load()
if epDisco == nil {
return
}
common := disco.BindUDPRelayEndpointCommon{
VNI: work.se.VNI,
Generation: work.handshakeGen,
RemoteKey: epDisco.key,
}
work.dlogf("[v1] magicsock: relayManager: starting handshake addrPorts=%v",
work.se.AddrPorts,
)
sentBindAny := false
bind := &disco.BindUDPRelayEndpoint{
BindUDPRelayEndpointCommon: common,
}
vni := packet.VirtualNetworkID{}
vni.Set(work.se.VNI)
for _, addrPort := range work.se.AddrPorts {
if addrPort.IsValid() {
sentBindAny = true
go ep.c.sendDiscoMessage(epAddr{ap: addrPort, vni: vni}, key.NodePublic{}, work.se.ServerDisco, bind, discoVerboseLog)
}
}
if !sentBindAny {
return
}
// Limit goroutine lifetime to a reasonable duration. This is intentionally
// detached and independent of 'BindLifetime' to prevent relay server
// (mis)configuration from negatively impacting client resource usage.
const maxHandshakeLifetime = time.Second * 30
timer := time.NewTimer(min(work.se.BindLifetime.Duration, maxHandshakeLifetime))
defer timer.Stop()
// Limit the number of pings we will transmit. Inbound pings trigger
// outbound pings, so we want to be a little defensive.
const limitPings = 10
var (
handshakeState disco.BindUDPRelayHandshakeState = disco.BindUDPRelayHandshakeStateBindSent
sentPingAt = make(map[stun.TxID]time.Time)
)
txPing := func(to netip.AddrPort, withAnswer *[32]byte) {
if len(sentPingAt) == limitPings {
return
}
txid := stun.NewTxID()
sentPingAt[txid] = time.Now()
ping := &disco.Ping{
TxID: txid,
NodeKey: ep.c.publicKeyAtomic.Load(),
}
go func() {
if withAnswer != nil {
answer := &disco.BindUDPRelayEndpointAnswer{BindUDPRelayEndpointCommon: common}
answer.Challenge = *withAnswer
ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, work.se.ServerDisco, answer, discoVerboseLog)
}
ep.c.sendDiscoMessage(epAddr{ap: to, vni: vni}, key.NodePublic{}, epDisco.key, ping, discoVerboseLog)
}()
}
validateVNIAndRemoteKey := func(common disco.BindUDPRelayEndpointCommon) error {
if common.VNI != work.se.VNI {
return errors.New("mismatching VNI")
}
if common.RemoteKey.Compare(epDisco.key) != 0 {
return errors.New("mismatching RemoteKey")
}
return nil
}
// This for{select{}} is responsible for handshaking and tx'ing ping/pong
// when the handshake is complete.
for {
select {
case <-work.ctx.Done():
work.dlogf("[v1] magicsock: relayManager: handshake canceled")
return
case msgEvent := <-work.rxDiscoMsgCh:
switch msg := msgEvent.msg.(type) {
case *disco.BindUDPRelayEndpointChallenge:
err := validateVNIAndRemoteKey(msg.BindUDPRelayEndpointCommon)
if err != nil {
continue
}
if handshakeState >= disco.BindUDPRelayHandshakeStateAnswerSent {
continue
}
work.dlogf("[v1] magicsock: relayManager: got handshake challenge from %v", msgEvent.from)
txPing(msgEvent.from, &msg.Challenge)
handshakeState = disco.BindUDPRelayHandshakeStateAnswerSent
case *disco.Ping:
if handshakeState < disco.BindUDPRelayHandshakeStateAnswerSent {
continue
}
work.dlogf("[v1] magicsock: relayManager: got relayed ping from %v", msgEvent.from)
// An inbound ping from the remote peer indicates we completed a
// handshake with the relay server (our answer msg was
// received). Chances are our ping was dropped before the remote
// handshake was complete. We need to rx a pong to determine
// latency, so send another ping. Since the handshake is
// complete we do not need to send an answer in front of this
// one.
//
// We don't need to TX a pong, that was already handled for us
// in handleRxDiscoMsgRunLoop().
txPing(msgEvent.from, nil)
case *disco.Pong:
at, ok := sentPingAt[msg.TxID]
if !ok {
continue
}
// The relay server endpoint is functional! Record the
// round-trip latency and return.
done.pongReceivedFrom = msgEvent.from
done.latency = time.Since(at)
work.dlogf("[v1] magicsock: relayManager: got relayed pong from %v latency=%v",
msgEvent.from,
done.latency.Round(time.Millisecond),
)
return
default:
// unexpected message type, silently discard
continue
}
case <-timer.C:
// The handshake timed out.
work.dlogf("[v1] magicsock: relayManager: handshake timed out")
return
}
}
}
const allocateUDPRelayEndpointRequestTimeout = time.Second * 10
func (r *relayManager) allocateServerEndpoint(work *relayEndpointAllocWork) {
done := relayEndpointAllocWorkDoneEvent{work: work}
defer func() {
work.doneCh <- done
relayManagerInputEvent(r, work.ctx, &r.allocateWorkDoneCh, done)
work.cancel()
}()
dm := &disco.AllocateUDPRelayEndpointRequest{
ClientDisco: work.discoKeys.Get(),
Generation: work.allocGen,
}
sendAllocReq := func() {
work.wlb.ep.c.sendDiscoAllocateUDPRelayEndpointRequest(
epAddr{
ap: netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, work.candidatePeerRelay.derpHomeRegionID),
},
work.candidatePeerRelay.nodeKey,
work.candidatePeerRelay.discoKey,
dm,
discoVerboseLog,
)
work.dlogf("[v1] magicsock: relayManager: sent alloc request")
}
go sendAllocReq()
returnAfterTimer := time.NewTimer(allocateUDPRelayEndpointRequestTimeout)
defer returnAfterTimer.Stop()
// While connections to DERP are over TCP, they can be lossy on the DERP
// server when data moves between the two independent streams. Also, the
// peer relay server may not be "ready" (see [tailscale.com/net/udprelay.ErrServerNotReady]).
// So, start a timer to retry once if needed.
retryAfterTimer := time.NewTimer(udprelay.ServerRetryAfter)
defer retryAfterTimer.Stop()
for {
select {
case <-work.ctx.Done():
work.dlogf("[v1] magicsock: relayManager: alloc request canceled")
return
case <-returnAfterTimer.C:
work.dlogf("[v1] magicsock: relayManager: alloc request timed out")
return
case <-retryAfterTimer.C:
go sendAllocReq()
case resp := <-work.rxDiscoMsgCh:
if resp.Generation != work.allocGen ||
!work.discoKeys.Equal(key.NewSortedPairOfDiscoPublic(resp.ClientDisco[0], resp.ClientDisco[1])) {
continue
}
work.dlogf("[v1] magicsock: relayManager: got alloc response")
done.allocated = udprelay.ServerEndpoint{
ServerDisco: resp.ServerDisco,
ClientDisco: resp.ClientDisco,
LamportID: resp.LamportID,
AddrPorts: resp.AddrPorts,
VNI: resp.VNI,
BindLifetime: tstime.GoDuration{Duration: resp.BindLifetime},
SteadyStateLifetime: tstime.GoDuration{Duration: resp.SteadyStateLifetime},
}
return
}
}
}
func (r *relayManager) allocateAllServersRunLoop(wlb endpointWithLastBest) {
if len(r.serversByNodeKey) == 0 {
return
}
remoteDisco := wlb.ep.disco.Load()
if remoteDisco == nil {
return
}
discoKeys := key.NewSortedPairOfDiscoPublic(wlb.ep.c.discoAtomic.Public(), remoteDisco.key)
for _, v := range r.serversByNodeKey {
byDiscoKeys, ok := r.allocWorkByDiscoKeysByServerNodeKey[v.nodeKey]
if !ok {
byDiscoKeys = make(map[key.SortedPairOfDiscoPublic]*relayEndpointAllocWork)
r.allocWorkByDiscoKeysByServerNodeKey[v.nodeKey] = byDiscoKeys
} else {
_, ok = byDiscoKeys[discoKeys]
if ok {
// If there is an existing key, a disco key collision may have
// occurred across peers ([*endpoint]). Do not overwrite the
// existing work, let it finish.
wlb.ep.c.logf("[unexpected] magicsock: relayManager: suspected disco key collision on server %v for keys: %v", v.nodeKey.ShortString(), discoKeys)
continue
}
}
ctx, cancel := context.WithCancel(context.Background())
started := &relayEndpointAllocWork{
wlb: wlb,
discoKeys: discoKeys,
candidatePeerRelay: v,
rxDiscoMsgCh: make(chan *disco.AllocateUDPRelayEndpointResponse),
doneCh: make(chan relayEndpointAllocWorkDoneEvent, 1),
ctx: ctx,
cancel: cancel,
}
byDiscoKeys[discoKeys] = started
byCandidatePeerRelay, ok := r.allocWorkByCandidatePeerRelayByEndpoint[wlb.ep]
if !ok {
byCandidatePeerRelay = make(map[candidatePeerRelay]*relayEndpointAllocWork)
r.allocWorkByCandidatePeerRelayByEndpoint[wlb.ep] = byCandidatePeerRelay
}
byCandidatePeerRelay[v] = started
r.allocGeneration++
started.allocGen = r.allocGeneration
go r.allocateServerEndpoint(started)
}
}