From c473927558e964104ef249ac6a171630dd49626b Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 23 Mar 2020 14:12:23 -0700 Subject: [PATCH] wgengine/magicsock: clean up, add, improve DERP logs --- wgengine/magicsock/magicsock.go | 141 +++++++++++++++++++++++++++----- 1 file changed, 120 insertions(+), 21 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index ac8a564b2..e48eccd81 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -7,6 +7,7 @@ package magicsock import ( + "bytes" "context" "crypto/tls" "encoding/binary" @@ -17,6 +18,7 @@ import ( "math/rand" "net" "os" + "sort" "strconv" "strings" "sync" @@ -104,7 +106,19 @@ type Conn struct { activeDerp map[int]activeDerp prevDerp map[int]*syncs.WaitGroupChan derpTLSConfig *tls.Config // normally nil; used by tests - derpRoute map[key.Public]derpRoute + + // derpRoute contains optional alternate routes to use as an + // optimization instead of contacting a peer via their home + // DERP connection. If they sent us a message on a different + // DERP connection (which should really only be on our DERP + // home connection, or what was once our home), then we + // remember that route here to optimistically use instead of + // creating a new DERP connection back to their home. + derpRoute map[key.Public]derpRoute // TODO: clean up this map sometime? + + // peerLastDerp tracks which DERP node we last used to speak with a + // peer. It's only used to quiet logging, so we only log on change. + peerLastDerp map[key.Public]int // TODO: clean up this map sometime? } // derpRoute is a route entry for a public key, saying that a certain @@ -135,7 +149,8 @@ func (c *Conn) addDerpPeerRoute(peer key.Public, derpID int, dc *derphttp.Client if c.derpRoute == nil { c.derpRoute = make(map[key.Public]derpRoute) } - c.derpRoute[peer] = derpRoute{derpID, dc} + r := derpRoute{derpID, dc} + c.derpRoute[peer] = r } // DerpMagicIP is a fake WireGuard endpoint IP address that means @@ -149,10 +164,14 @@ var derpMagicIP = net.ParseIP(DerpMagicIP).To4() // activeDerp contains fields for an active DERP connection. type activeDerp struct { - c *derphttp.Client - cancel context.CancelFunc - writeCh chan<- derpWriteRequest - lastWrite *time.Time + c *derphttp.Client + cancel context.CancelFunc + writeCh chan<- derpWriteRequest + // lastWrite is the time of the last request for its write + // channel (currently even if there was no write). + // It is always non-nil and initialized to a non-zero Time[ + lastWrite *time.Time + createTime time.Time } // udpAddr is the key in the addrsByUDP map. @@ -218,6 +237,7 @@ func Listen(opts Options) (*Conn, error) { udpRecvCh: make(chan udpReadResult), derpTLSConfig: opts.derpTLSConfig, derps: opts.DERPs, + peerLastDerp: make(map[key.Public]int), } if err := c.initialBind(); err != nil { @@ -275,11 +295,11 @@ func (c *Conn) updateEndpoints(why string) { } }() - c.logf("magicsock.Conn: starting endpoint update (%s)", why) + c.logf("magicsock: starting endpoint update (%s)", why) endpoints, err := c.determineEndpoints(c.connCtx) if err != nil { - c.logf("magicsock.Conn: endpoint update (%s) failed: %v", why, err) + c.logf("magicsock: endpoint update (%s) failed: %v", why, err) // TODO(crawshaw): are there any conditions under which // we should trigger a retry based on the error here? return @@ -422,7 +442,7 @@ func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) { // On change, notify all currently connected DERP servers and // start connecting to our home DERP if we are not already. c.myDerp = derpNum - c.logf("home DERP server is now %v, %v", derpNum, c.derps.ServerByID(derpNum)) + c.logf("magicsock: home DERP server is now %v (%v)", derpNum, c.derps.ServerByID(derpNum)) for i, ad := range c.activeDerp { go ad.c.NotePreferred(i == c.myDerp) } @@ -747,6 +767,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de ad, ok := c.activeDerp[nodeID] if ok { *ad.lastWrite = time.Now() + c.setPeerLastDerpLocked(peer, nodeID, nodeID) return ad.writeCh } @@ -759,12 +780,15 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de if !peer.IsZero() && debugUseDerpRoute { if r, ok := c.derpRoute[peer]; ok { if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc { + c.setPeerLastDerpLocked(peer, r.derpID, nodeID) *ad.lastWrite = time.Now() return ad.writeCh } } } + c.logf("magicsock: adding connection to derp%v", nodeID) + if c.activeDerp == nil { c.activeDerp = make(map[int]activeDerp) c.prevDerp = make(map[int]*syncs.WaitGroupChan) @@ -778,7 +802,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de // so it is safe to do under the mu lock. dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf) if err != nil { - c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err) + c.logf("derphttp.NewClient: node %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err) return nil } @@ -793,7 +817,11 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de ad.writeCh = ch ad.cancel = cancel ad.lastWrite = new(time.Time) + *ad.lastWrite = time.Now() + ad.createTime = time.Now() c.activeDerp[nodeID] = ad + c.logActiveDerpLocked() + c.setPeerLastDerpLocked(peer, nodeID, nodeID) // Build a startGate for the derp reader+writer // goroutines, so they don't start running until any @@ -810,10 +838,24 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- de go c.runDerpReader(ctx, addr, dc, wg, startGate) go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate) - *ad.lastWrite = time.Now() return ad.writeCh } +// c.mu must be held. +func (c *Conn) setPeerLastDerpLocked(peer key.Public, nodeID, homeID int) { + if peer.IsZero() { + return + } + old := c.peerLastDerp[peer] + if old == nodeID { + return + } + c.peerLastDerp[peer] = nodeID + + pubKey := wgcfg.Key(peer) + c.logf("magicsock: derp route for %x (home derp %v) changed from derp %d => %d", pubKey.ShortString(), homeID, old, nodeID) +} + // derpReadResult is the type sent by runDerpClient to ReceiveIPv4 // when a DERP packet is available. // @@ -1122,8 +1164,8 @@ func (c *Conn) SetPrivateKey(privateKey wgcfg.PrivateKey) error { // Key changed. Close existing DERP connections and reconnect to home. myDerp := c.myDerp c.myDerp = 0 - c.logf("magicsock private key set, rebooting connection to home DERP %d", myDerp) - c.closeAllDerpLocked() + c.logf("magicsock: private key set, rebooting connection to home DERP %d", myDerp) + c.closeAllDerpLocked("new-private-key") go c.setNearestDERP(myDerp) return nil @@ -1137,41 +1179,87 @@ func (c *Conn) SetDERPEnabled(wantDerp bool) { c.wantDerp = wantDerp if !wantDerp { - c.closeAllDerpLocked() + c.closeAllDerpLocked("derp-disabled") } } // c.mu must be held. -func (c *Conn) closeAllDerpLocked() { +func (c *Conn) closeAllDerpLocked(why string) { + if len(c.activeDerp) == 0 { + return // without the useless log statement + } for i := range c.activeDerp { - c.closeDerpLocked(i) + c.closeDerpLocked(i, why) } + c.logActiveDerpLocked() } // c.mu must be held. -func (c *Conn) closeDerpLocked(node int) { +// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes. +func (c *Conn) closeDerpLocked(node int, why string) { if ad, ok := c.activeDerp[node]; ok { - c.logf("closing connection to derp%v", node) + c.logf("magicsock: closing connection to derp%v (%v), age %v", node, why, time.Since(ad.createTime).Round(time.Second)) go ad.c.Close() ad.cancel() delete(c.activeDerp, node) } } +var bufPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} + +// c.mu must be held. +func (c *Conn) logActiveDerpLocked() { + buf := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(buf) + now := time.Now() + buf.Reset() + buf.WriteString(": ") + c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) { + fmt.Fprintf(buf, "derp%d=cr%v,wr%v ", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite))) + }) + var details []byte + if buf.Len() > len(": ") { + details = bytes.TrimSpace(buf.Bytes()) + } + c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), details) +} + +// c.mu must be held. +func (c *Conn) foreachActiveDerpSortedLocked(fn func(nodeID int, ad activeDerp)) { + if len(c.activeDerp) < 2 { + for id, ad := range c.activeDerp { + fn(id, ad) + } + return + } + ids := make([]int, 0, len(c.activeDerp)) + for id := range c.activeDerp { + ids = append(ids, id) + } + sort.Ints(ids) + for _, id := range ids { + fn(id, c.activeDerp[id]) + } +} + func (c *Conn) cleanStaleDerp() { c.mu.Lock() defer c.mu.Unlock() const inactivityTime = 60 * time.Second tooOld := time.Now().Add(-inactivityTime) + dirty := false for i, ad := range c.activeDerp { if i == c.myDerp { continue } if ad.lastWrite.Before(tooOld) { - c.logf("closing stale DERP connection to derp%v", i) - c.closeDerpLocked(i) + c.closeDerpLocked(i, "idle") + dirty = true } } + if dirty { + c.logActiveDerpLocked() + } } // DERPs reports the number of active DERP connections. @@ -1198,7 +1286,7 @@ func (c *Conn) Close() error { c.closed = true c.connCtxCancel() - c.closeAllDerpLocked() + c.closeAllDerpLocked("conn-close") if c.pconn6 != nil { c.pconn6.Close() } @@ -1687,3 +1775,14 @@ func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { return n, err } } + +// simpleDur rounds d such that it stringifies to something short. +func simpleDur(d time.Duration) time.Duration { + if d < time.Second { + return d.Round(time.Millisecond) + } + if d < time.Minute { + return d.Round(time.Second) + } + return d.Round(time.Minute) +}