net/netcheck, wgengine/magicsock: bound DERP latency by TCP RTT

Signed-off-by: Andrew Dunham <andrew@du.nham.ca>
Change-Id: Ie554364227ce7973644025129b8046fc7df8fb35
This commit is contained in:
Andrew Dunham 2023-07-13 10:43:24 -04:00
parent f7016d8c00
commit 038751bf2d
5 changed files with 89 additions and 13 deletions

View File

@ -10,12 +10,14 @@ import (
"errors"
"fmt"
"io"
"net"
"net/netip"
"sync"
"time"
"go4.org/mem"
"golang.org/x/time/rate"
"tailscale.com/net/tcpinfo"
"tailscale.com/syncs"
"tailscale.com/types/key"
"tailscale.com/types/logger"
@ -621,3 +623,16 @@ func (c *Client) LocalAddr() (netip.AddrPort, error) {
}
return netip.ParseAddrPort(a.String())
}
// RTT returns the current TCP round-trip time (RTT) between the current node
// and the DERP server, or an error if this is unimplemented on the current
// platform or the value cannot be retrieved.
func (c *Client) RTT() (time.Duration, error) {
// If the underlying value isn't a net.Conn, it's something that we
// don't support; abort now.
nc, ok := c.nc.(net.Conn)
if !ok {
return 0, tcpinfo.ErrNotTCP
}
return tcpinfo.RTT(nc)
}

View File

@ -851,7 +851,7 @@ func (c *Client) SendPing(data [8]byte) error {
return ErrClientClosed
}
if client == nil {
return errors.New("client not connected")
return ErrClientNotConnected
}
return client.SendPing(data)
}
@ -866,11 +866,26 @@ func (c *Client) LocalAddr() (netip.AddrPort, error) {
return netip.AddrPort{}, ErrClientClosed
}
if client == nil {
return netip.AddrPort{}, errors.New("client not connected")
return netip.AddrPort{}, ErrClientNotConnected
}
return client.LocalAddr()
}
// RTT reports the TCP RTT for the currently-active DERP connection, without
// any implicit connect or reconnect.
func (c *Client) RTT() (time.Duration, error) {
c.mu.Lock()
closed, client := c.closed, c.client
c.mu.Unlock()
if closed {
return 0, ErrClientClosed
}
if client == nil {
return 0, ErrClientNotConnected
}
return client.RTT()
}
func (c *Client) ForwardPacket(from, to key.NodePublic, b []byte) error {
client, _, err := c.connect(context.TODO(), "derphttp.Client.ForwardPacket")
if err != nil {
@ -1045,6 +1060,7 @@ func (c *Client) closeForReconnect(brokenClient *derp.Client) {
}
var ErrClientClosed = errors.New("derphttp.Client closed")
var ErrClientNotConnected = errors.New("client not connected")
func parseMetaCert(certs []*x509.Certificate) (serverPub key.NodePublic, serverProtoVersion int) {
for _, cert := range certs {

View File

@ -790,10 +790,20 @@ func (c *Client) udpBindAddr() string {
return ":0"
}
// GetReportOptions contains options that can be passed to GetReport.
type GetReportOptions struct {
// DERPRegionLatencyBounds contains upper bounds for the latency to a
// given DERP region, typically determined by having an existing open
// connection to that region. This is used to bound the latency
// determined for a region when selecting a PreferredDERP ("home DERP")
// region.
DERPRegionLatencyBounds map[int]time.Duration
}
// GetReport gets a report.
//
// It may not be called concurrently with itself.
func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap) (_ *Report, reterr error) {
func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap, opts *GetReportOptions) (_ *Report, reterr error) {
defer func() {
if reterr != nil {
metricNumGetReportError.Add(1)
@ -870,7 +880,7 @@ func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap) (_ *Report,
if err := c.runHTTPOnlyChecks(ctx, last, rs, dm); err != nil {
return nil, err
}
return c.finishAndStoreReport(rs, dm), nil
return c.finishAndStoreReport(rs, dm, opts), nil
}
var ifState *interfaces.State
@ -1104,15 +1114,15 @@ func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap) (_ *Report,
// Wait for captive portal check before finishing the report.
<-captivePortalDone
return c.finishAndStoreReport(rs, dm), nil
return c.finishAndStoreReport(rs, dm, opts), nil
}
func (c *Client) finishAndStoreReport(rs *reportState, dm *tailcfg.DERPMap) *Report {
func (c *Client) finishAndStoreReport(rs *reportState, dm *tailcfg.DERPMap, opts *GetReportOptions) *Report {
rs.mu.Lock()
report := rs.report.Clone()
rs.mu.Unlock()
c.addReportHistoryAndSetPreferredDERP(report, dm.View())
c.addReportHistoryAndSetPreferredDERP(report, dm.View(), opts)
c.logConciseReport(report, dm)
return report
@ -1457,7 +1467,7 @@ const (
// addReportHistoryAndSetPreferredDERP adds r to the set of recent Reports
// and mutates r.PreferredDERP to contain the best recent one.
func (c *Client) addReportHistoryAndSetPreferredDERP(r *Report, dm tailcfg.DERPMapView) {
func (c *Client) addReportHistoryAndSetPreferredDERP(r *Report, dm tailcfg.DERPMapView, opts *GetReportOptions) {
c.mu.Lock()
defer c.mu.Unlock()
@ -1489,6 +1499,20 @@ func (c *Client) addReportHistoryAndSetPreferredDERP(r *Report, dm tailcfg.DERPM
}
}
// Bound each region's latency by the TCP RTT, if we have that option.
if opts != nil {
for regionID, bound := range opts.DERPRegionLatencyBounds {
curr, ok := bestRecent[regionID]
if !ok {
continue
}
if curr > bound {
bestRecent[regionID] = bound
}
}
}
// Scale each region's best latency by any provided scores from the
// DERPMap, for use in comparison below.
var scores views.Map[int, float64]

View File

@ -166,7 +166,7 @@ func TestBasic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String()))
r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String()), nil)
if err != nil {
t.Fatal(err)
}
@ -205,7 +205,7 @@ func TestWorksWhenUDPBlocked(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
r, err := c.GetReport(ctx, dm)
r, err := c.GetReport(ctx, dm, nil)
if err != nil {
t.Fatal(err)
}
@ -401,7 +401,7 @@ func TestAddReportHistoryAndSetPreferredDERP(t *testing.T) {
dm := &tailcfg.DERPMap{HomeParams: tt.homeParams}
for _, s := range tt.steps {
fakeTime = fakeTime.Add(s.after)
c.addReportHistoryAndSetPreferredDERP(s.r, dm.View())
c.addReportHistoryAndSetPreferredDERP(s.r, dm.View(), nil)
}
lastReport := tt.steps[len(tt.steps)-1].r
if got, want := len(c.prev), tt.wantPrevLen; got != want {
@ -862,7 +862,7 @@ func TestNoCaptivePortalWhenUDP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String()))
r, err := c.GetReport(ctx, stuntest.DERPMapOf(stunAddr.String()), nil)
if err != nil {
t.Fatal(err)
}

View File

@ -841,19 +841,40 @@ func (c *Conn) setNetInfoHavePortMap() {
func (c *Conn) updateNetInfo(ctx context.Context) (*netcheck.Report, error) {
c.mu.Lock()
dm := c.derpMap
// We want to get the active RTTs for all open DERP connections, since
// we can use this to bound the latency to a given DERP region.
// However, we don't want to hold the mutex while making a bunch of
// syscalls; grab all clients here, and then actually fetch them below.
openDERPs := make(map[int]*derphttp.Client, len(c.activeDerp))
for regionID, ad := range c.activeDerp {
openDERPs[regionID] = ad.c
}
c.mu.Unlock()
if dm == nil || c.networkDown() {
return new(netcheck.Report), nil
}
derpBounds := make(map[int]time.Duration, len(openDERPs))
for regionID, dclient := range openDERPs {
if dur, err := dclient.RTT(); err == nil {
derpBounds[regionID] = dur
} else {
c.dlogf("[v1] magicsock: error fetching RTT for region %d: %v", regionID, err)
}
}
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
c.stunReceiveFunc.Store(c.netChecker.ReceiveSTUNPacket)
defer c.ignoreSTUNPackets()
report, err := c.netChecker.GetReport(ctx, dm)
report, err := c.netChecker.GetReport(ctx, dm, &netcheck.GetReportOptions{
DERPRegionLatencyBounds: derpBounds,
})
if err != nil {
return nil, err
}