diff --git a/derp/derpserver/derpserver.go b/derp/derpserver/derpserver.go index 0959a4729..816ed3f83 100644 --- a/derp/derpserver/derpserver.go +++ b/derp/derpserver/derpserver.go @@ -87,6 +87,12 @@ const ( defaultPerClientSendQueueDepth = 32 // default packets buffered for sending DefaultTCPWiteTimeout = 2 * time.Second privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key + + // notHereCacheTTL is how long we suppress repeated lookups and + // peerGone notifications for a destination that is not connected + // to this server. Must be longer than the client's disco retry + // interval (3-5s) to suppress at least one retry. + notHereCacheTTL = 10 * time.Second ) func getPerClientSendQueueDepth() int { @@ -140,37 +146,44 @@ type Server struct { localClient local.Client // Counters: - packetsSent, bytesSent expvar.Int - packetsRecv, bytesRecv expvar.Int - packetsRecvByKind metrics.LabelMap - packetsRecvDisco *expvar.Int - packetsRecvOther *expvar.Int - _ align64 - packetsForwardedOut expvar.Int - packetsForwardedIn expvar.Int - peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent - peerGoneNotHereFrames expvar.Int // number of peer not here frames sent - gotPing expvar.Int // number of ping frames from client - sentPong expvar.Int // number of pong frames enqueued to client - accepts expvar.Int - curClients expvar.Int - curClientsNotIdeal expvar.Int - curHomeClients expvar.Int // ones with preferred - dupClientKeys expvar.Int // current number of public keys we have 2+ connections for - dupClientConns expvar.Int // current number of connections sharing a public key - dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed - unknownFrames expvar.Int - homeMovesIn expvar.Int // established clients announce home server moves in - homeMovesOut expvar.Int // established clients announce home server moves out - multiForwarderCreated expvar.Int - multiForwarderDeleted expvar.Int - removePktForwardOther expvar.Int - sclientWriteTimeouts expvar.Int - avgQueueDuration *uint64 // In milliseconds; accessed atomically - tcpRtt metrics.LabelMap // histogram - meshUpdateBatchSize *metrics.Histogram - meshUpdateLoopCount *metrics.Histogram - bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + packetsSent, bytesSent expvar.Int + packetsRecv, bytesRecv expvar.Int + packetsRecvByKind metrics.LabelMap + packetsRecvDisco *expvar.Int + packetsRecvOther *expvar.Int + _ align64 + packetsForwardedOut expvar.Int + packetsForwardedIn expvar.Int + peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent + peerGoneNotHereFrames expvar.Int // number of peer not here frames sent + packetsDroppedCachedNotHere expvar.Int // number of packets dropped via not-here cache + gotPing expvar.Int // number of ping frames from client + sentPong expvar.Int // number of pong frames enqueued to client + accepts expvar.Int + curClients expvar.Int + curClientsNotIdeal expvar.Int + curHomeClients expvar.Int // ones with preferred + dupClientKeys expvar.Int // current number of public keys we have 2+ connections for + dupClientConns expvar.Int // current number of connections sharing a public key + dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed + unknownFrames expvar.Int + homeMovesIn expvar.Int // established clients announce home server moves in + homeMovesOut expvar.Int // established clients announce home server moves out + multiForwarderCreated expvar.Int + multiForwarderDeleted expvar.Int + removePktForwardOther expvar.Int + sclientWriteTimeouts expvar.Int + avgQueueDuration *uint64 // In milliseconds; accessed atomically + tcpRtt metrics.LabelMap // histogram + meshUpdateBatchSize *metrics.Histogram + meshUpdateLoopCount *metrics.Histogram + bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + + // notHereCache is a server-wide cache of destination keys that are + // not connected to this server. Shared across all client goroutines + // to avoid repeated mutex-protected client map lookups. + // Key: key.NodePublic, Value: time.Time (when entry was added). + notHereCache sync.Map // verifyClientsLocalTailscaled only accepts client connections to the DERP // server if the clientKey is a known peer in the network, as specified by a @@ -667,6 +680,10 @@ func (s *Server) ModifyTLSConfigToAddMetaCert(c *tls.Config) { // observe EOFs/timeouts) but won't send them frames on the assumption // that they're dead. func (s *Server) registerClient(c *sclient) { + // Invalidate not-here cache so other clients sending to this + // key will do a fresh lookup and find the newly connected peer. + s.notHereCache.Delete(c.key) + s.mu.Lock() defer s.mu.Unlock() @@ -866,15 +883,21 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.NodePublic) { // requestPeerGoneWriteLimited sends a request to write a "peer gone" // frame, but only in reply to a disco packet, and only if we haven't -// sent one recently. +// already notified this client about this specific peer recently. +// The caller must add to notHereCache separately. The peerGoneLim +// provides a per-client safety backstop against clients sending to +// many unique absent destinations. func (c *sclient) requestPeerGoneWriteLimited(peer key.NodePublic, contents []byte, reason derp.PeerGoneReasonType) { - if disco.LooksLikeDiscoWrapper(contents) != true { + if !disco.LooksLikeDiscoWrapper(contents) { return } - if c.peerGoneLim.Allow() { - go c.requestPeerGoneWrite(peer, reason) + // Per-client safety backstop. + if !c.peerGoneLim.Allow() { + return } + + go c.requestPeerGoneWrite(peer, reason) } func (s *Server) addWatcher(c *sclient) { @@ -952,7 +975,7 @@ func (s *Server) accept(ctx context.Context, nc derp.Conn, brw *bufio.ReadWriter peerGone: make(chan peerGoneMsg), canMesh: s.isMeshPeer(clientInfo), isNotIdealConn: IdealNodeContextKey.Value(ctx) != "", - peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3), + peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 30), } if c.canMesh { @@ -1185,10 +1208,14 @@ func (c *sclient) handleFrameForwardPacket(ft derp.FrameType, fl uint32) error { func (c *sclient) handleFrameSendPacket(ft derp.FrameType, fl uint32) error { s := c.s - dstKey, contents, err := s.recvPacket(c.br, fl) + dstKey, contents, err := s.recvPacket(c, fl) if err != nil { return fmt.Errorf("client %v: recvPacket: %v", c.key, err) } + if contents == nil { + // Packet was dropped early by the not-here cache. + return nil + } var fwd PacketForwarder var dstLen int @@ -1220,6 +1247,7 @@ func (c *sclient) handleFrameSendPacket(ft derp.FrameType, fl uint32) error { reason = dropReasonDupClient } else { c.requestPeerGoneWriteLimited(dstKey, contents, derp.PeerGoneReasonNotHere) + s.addNotHereCache(dstKey, s.clock.Now()) } s.recordDrop(contents, c.key, dstKey, reason) c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) @@ -1559,19 +1587,34 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info return clientKey, info, nil } -func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { +// recvPacket reads a send-packet frame from br. If the destination key +// is in the client's not-here cache, the payload is discarded without +// allocation and contents is returned as nil. +func (s *Server) recvPacket(c *sclient, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { if frameLen < derp.KeyLen { return zpub, nil, errors.New("short send packet frame") } - if err := dstKey.ReadRawWithoutAllocating(br); err != nil { + if err := dstKey.ReadRawWithoutAllocating(c.br); err != nil { return zpub, nil, err } packetLen := frameLen - derp.KeyLen if packetLen > derp.MaxPacketSize { return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, derp.MaxPacketSize) } + + // Check not-here cache before reading payload. This avoids + // payload allocation, io.ReadFull, s.mu lock, and client map + // lookup for repeated packets to absent destinations. + if s.isNotHereCached(dstKey, s.clock.Now()) { + if _, err := c.br.Discard(int(packetLen)); err != nil { + return zpub, nil, err + } + s.packetsDroppedCachedNotHere.Add(1) + return dstKey, nil, nil + } + contents = make([]byte, packetLen) - if _, err := io.ReadFull(br, contents); err != nil { + if _, err := io.ReadFull(c.br, contents); err != nil { return zpub, nil, err } s.packetsRecv.Add(1) @@ -1685,6 +1728,25 @@ func (c *sclient) presentFlags() derp.PeerPresentFlags { return f } +// isNotHereCached reports whether dstKey is in the server's not-here +// cache and the entry has not expired. +func (s *Server) isNotHereCached(dstKey key.NodePublic, now time.Time) bool { + v, ok := s.notHereCache.Load(dstKey) + if !ok { + return false + } + if now.Sub(v.(time.Time)) > notHereCacheTTL { + s.notHereCache.Delete(dstKey) + return false + } + return true +} + +// addNotHereCache records that dstKey is not connected to this server. +func (s *Server) addNotHereCache(dstKey key.NodePublic, now time.Time) { + s.notHereCache.Store(dstKey, now) +} + // peerConnState represents whether a peer is connected to the server // or not. type peerConnState struct { @@ -2233,6 +2295,7 @@ func (s *Server) ExpVar() expvar.Var { m.Set("sent_pong", &s.sentPong) m.Set("peer_gone_disconnected_frames", &s.peerGoneDisconnectedFrames) m.Set("peer_gone_not_here_frames", &s.peerGoneNotHereFrames) + m.Set("packets_dropped_cached_not_here", &s.packetsDroppedCachedNotHere) m.Set("packets_forwarded_out", &s.packetsForwardedOut) m.Set("packets_forwarded_in", &s.packetsForwardedIn) m.Set("multiforwarder_created", &s.multiForwarderCreated) diff --git a/derp/derpserver/derpserver_test.go b/derp/derpserver/derpserver_test.go index 7f956ba78..cb78b0a30 100644 --- a/derp/derpserver/derpserver_test.go +++ b/derp/derpserver/derpserver_test.go @@ -27,6 +27,7 @@ import ( "golang.org/x/time/rate" "tailscale.com/derp" "tailscale.com/derp/derpconst" + tsrate "tailscale.com/tstime/rate" "tailscale.com/types/key" "tailscale.com/types/logger" ) @@ -680,12 +681,114 @@ func BenchmarkConcurrentStreams(b *testing.B) { <-acceptDone } +// loopReader is an io.Reader that repeats data infinitely. +type loopReader struct { + data []byte + off int +} + +func (r *loopReader) Read(p []byte) (int, error) { + n := 0 + for n < len(p) { + copied := copy(p[n:], r.data[r.off:]) + n += copied + r.off += copied + if r.off >= len(r.data) { + r.off = 0 + } + } + return n, nil +} + func BenchmarkSendRecv(b *testing.B) { for _, size := range []int{10, 100, 1000, 10000} { b.Run(fmt.Sprintf("msgsize=%d", size), func(b *testing.B) { benchmarkSendRecvSize(b, size) }) } } + +// BenchmarkHandleFrameSendPacketAbsent benchmarks the server-side +// handleFrameSendPacket path directly, bypassing TLS and TCP to +// isolate server processing cost. It compares cache hits (same absent +// destination) vs cache misses (rotating through many absent destinations). +func BenchmarkHandleFrameSendPacketAbsent(b *testing.B) { + const payloadSize = 100 + const frameSize = derp.KeyLen + payloadSize + + setup := func(b *testing.B) *sclient { + b.Helper() + s := New(key.NewNode(), logger.Discard) + b.Cleanup(func() { s.Close() }) + c := &sclient{ + s: s, + key: key.NewNode().Public(), + logf: logger.Discard, + done: make(chan struct{}), + peerGone: make(chan peerGoneMsg, 100), + peerGoneLim: tsrate.NewLimiter(tsrate.Every(time.Second/50), 50), + } + return c + } + + // buildFrames builds a byte buffer containing n frame payloads. + // If sameKey, all frames use the same destination key (cache hit + // after first). Otherwise, each frame uses a distinct key (cache miss). + buildFrames := func(n int, sameKey bool) []byte { + buf := make([]byte, n*frameSize) + var fixedKey []byte + if sameKey { + fixedKey = key.NewNode().Public().AppendTo(nil) + } + for i := range n { + off := i * frameSize + if sameKey { + copy(buf[off:], fixedKey) + } else { + k := key.NewNode().Public() + copy(buf[off:], k.AppendTo(nil)) + } + } + return buf + } + + // same_key: all packets to same absent peer (cache hit after first). + b.Run("same_key", func(b *testing.B) { + c := setup(b) + frame := buildFrames(1, true) + // Use an infinite reader that repeats the same frame. + c.br = bufio.NewReader(&loopReader{data: frame}) + + b.SetBytes(int64(payloadSize)) + b.ReportAllocs() + b.ResetTimer() + for range b.N { + if err := c.handleFrameSendPacket(derp.FrameSendPacket, frameSize); err != nil { + b.Fatal(err) + } + } + }) + + // unique_keys: each packet to a different absent peer (always cache miss). + // Uses a pool of 1000 keys cycling, each seen once per 1000 iterations. + // With 5s TTL the cache entries from earlier cycles are still valid, + // so this measures cache-miss-then-hit pattern. + b.Run("unique_keys", func(b *testing.B) { + const numKeys = 1000 + c := setup(b) + pool := buildFrames(numKeys, false) + c.br = bufio.NewReader(&loopReader{data: pool}) + + b.SetBytes(int64(payloadSize)) + b.ReportAllocs() + b.ResetTimer() + for range b.N { + if err := c.handleFrameSendPacket(derp.FrameSendPacket, frameSize); err != nil { + b.Fatal(err) + } + } + }) +} + func benchmarkSendRecvSize(b *testing.B, packetSize int) { serverPrivateKey := key.NewNode() s := New(serverPrivateKey, logger.Discard) @@ -973,3 +1076,102 @@ func BenchmarkSenderCardinalityOverhead(b *testing.B) { } }) } + +func TestNotHereCache(t *testing.T) { + s := New(key.NewNode(), t.Logf) + defer s.Close() + + absentKey := key.NewNode().Public() + now := time.Now() + + // Initially not cached. + if s.isNotHereCached(absentKey, now) { + t.Fatal("expected not cached initially") + } + + // Add to cache. + s.addNotHereCache(absentKey, now) + if !s.isNotHereCached(absentKey, now) { + t.Fatal("expected cached after add") + } + + // Still cached just before TTL expires. + if !s.isNotHereCached(absentKey, now.Add(notHereCacheTTL-time.Millisecond)) { + t.Fatal("expected cached just before TTL") + } + + // Expired after TTL. + if s.isNotHereCached(absentKey, now.Add(notHereCacheTTL+time.Millisecond)) { + t.Fatal("expected expired after TTL") + } + + // Re-add and verify invalidation on registerClient. + s.addNotHereCache(absentKey, time.Now()) + if !s.isNotHereCached(absentKey, time.Now()) { + t.Fatal("expected cached after re-add") + } + + c := &sclient{ + s: s, + key: absentKey, + logf: logger.WithPrefix(t.Logf, "test: "), + done: make(chan struct{}), + peerGone: make(chan peerGoneMsg), + sendQueue: make(chan pkt, 1), + discoSendQueue: make(chan pkt, 1), + sendPongCh: make(chan [8]byte, 1), + } + s.registerClient(c) + defer s.unregisterClient(c) + + if s.isNotHereCached(absentKey, time.Now()) { + t.Fatal("expected cache invalidated after registerClient") + } +} + +func TestNotHereCacheRecvPacket(t *testing.T) { + s := New(key.NewNode(), t.Logf) + defer s.Close() + + absentKey := key.NewNode().Public() + + // Build a frame payload: 32-byte dest key + 100-byte payload. + payload := make([]byte, derp.KeyLen+100) + copy(payload[:derp.KeyLen], absentKey.AppendTo(nil)) + + c := &sclient{ + s: s, + key: key.NewNode().Public(), + logf: logger.WithPrefix(t.Logf, "test: "), + done: make(chan struct{}), + peerGone: make(chan peerGoneMsg, 100), + peerGoneLim: tsrate.NewLimiter(tsrate.Every(time.Second/50), 50), + } + + // First call: cache miss, should return contents. + c.br = bufio.NewReader(&loopReader{data: payload}) + _, contents, err := s.recvPacket(c, uint32(len(payload))) + if err != nil { + t.Fatalf("recvPacket (miss): %v", err) + } + if contents == nil { + t.Fatal("expected non-nil contents on cache miss") + } + + // Populate the cache as handleFrameSendPacket would. + s.addNotHereCache(absentKey, s.clock.Now()) + + // Second call: cache hit, should return nil contents. + _, contents, err = s.recvPacket(c, uint32(len(payload))) + if err != nil { + t.Fatalf("recvPacket (hit): %v", err) + } + if contents != nil { + t.Fatal("expected nil contents on cache hit") + } + + // Verify metric incremented. + if got := s.packetsDroppedCachedNotHere.Value(); got != 1 { + t.Fatalf("packetsDroppedCachedNotHere = %d, want 1", got) + } +}