mirror of
https://github.com/tailscale/tailscale.git
synced 2026-03-28 08:42:05 +01:00
derp/derpserver: add server-level not-here cache for absent peer drops
Add a sync.Map cache on the Server that tracks destination keys not connected to this server. When a packet arrives for a cached absent destination, the payload is discarded via bufio.Reader.Discard without allocating a buffer, taking the server mutex, or looking up the client map. Cache entries expire after a configurable TTL (10s) and are invalidated in registerClient when a peer connects. Move the cache check into recvPacket so handleFrameSendPacket does not need to duplicate the frame validation and key reading logic. When the cache is hit, recvPacket returns nil contents and the caller returns early. Increase peerGoneLim from 3/sec to 50/sec per client. The not-here cache now provides per-destination rate limiting, so the per-client limiter serves only as a safety backstop against clients sending to many unique absent destinations. ``` name old ns/op new ns/op delta HandleFrameSendPacketAbsent/same_key 570 131 -77% HandleFrameSendPacketAbsent/unique_keys 559 145 -74% (0 B/op, 0 allocs/op on cache hits; was 280 B/op, 8 allocs/op) ``` Updates #38509
This commit is contained in:
parent
d3bfc33745
commit
55751c3d6d
@ -87,6 +87,12 @@ const (
|
||||
defaultPerClientSendQueueDepth = 32 // default packets buffered for sending
|
||||
DefaultTCPWiteTimeout = 2 * time.Second
|
||||
privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key
|
||||
|
||||
// notHereCacheTTL is how long we suppress repeated lookups and
|
||||
// peerGone notifications for a destination that is not connected
|
||||
// to this server. Must be longer than the client's disco retry
|
||||
// interval (3-5s) to suppress at least one retry.
|
||||
notHereCacheTTL = 10 * time.Second
|
||||
)
|
||||
|
||||
func getPerClientSendQueueDepth() int {
|
||||
@ -140,37 +146,44 @@ type Server struct {
|
||||
localClient local.Client
|
||||
|
||||
// Counters:
|
||||
packetsSent, bytesSent expvar.Int
|
||||
packetsRecv, bytesRecv expvar.Int
|
||||
packetsRecvByKind metrics.LabelMap
|
||||
packetsRecvDisco *expvar.Int
|
||||
packetsRecvOther *expvar.Int
|
||||
_ align64
|
||||
packetsForwardedOut expvar.Int
|
||||
packetsForwardedIn expvar.Int
|
||||
peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
|
||||
peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
|
||||
gotPing expvar.Int // number of ping frames from client
|
||||
sentPong expvar.Int // number of pong frames enqueued to client
|
||||
accepts expvar.Int
|
||||
curClients expvar.Int
|
||||
curClientsNotIdeal expvar.Int
|
||||
curHomeClients expvar.Int // ones with preferred
|
||||
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
||||
dupClientConns expvar.Int // current number of connections sharing a public key
|
||||
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
||||
unknownFrames expvar.Int
|
||||
homeMovesIn expvar.Int // established clients announce home server moves in
|
||||
homeMovesOut expvar.Int // established clients announce home server moves out
|
||||
multiForwarderCreated expvar.Int
|
||||
multiForwarderDeleted expvar.Int
|
||||
removePktForwardOther expvar.Int
|
||||
sclientWriteTimeouts expvar.Int
|
||||
avgQueueDuration *uint64 // In milliseconds; accessed atomically
|
||||
tcpRtt metrics.LabelMap // histogram
|
||||
meshUpdateBatchSize *metrics.Histogram
|
||||
meshUpdateLoopCount *metrics.Histogram
|
||||
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
|
||||
packetsSent, bytesSent expvar.Int
|
||||
packetsRecv, bytesRecv expvar.Int
|
||||
packetsRecvByKind metrics.LabelMap
|
||||
packetsRecvDisco *expvar.Int
|
||||
packetsRecvOther *expvar.Int
|
||||
_ align64
|
||||
packetsForwardedOut expvar.Int
|
||||
packetsForwardedIn expvar.Int
|
||||
peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
|
||||
peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
|
||||
packetsDroppedCachedNotHere expvar.Int // number of packets dropped via not-here cache
|
||||
gotPing expvar.Int // number of ping frames from client
|
||||
sentPong expvar.Int // number of pong frames enqueued to client
|
||||
accepts expvar.Int
|
||||
curClients expvar.Int
|
||||
curClientsNotIdeal expvar.Int
|
||||
curHomeClients expvar.Int // ones with preferred
|
||||
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
||||
dupClientConns expvar.Int // current number of connections sharing a public key
|
||||
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
||||
unknownFrames expvar.Int
|
||||
homeMovesIn expvar.Int // established clients announce home server moves in
|
||||
homeMovesOut expvar.Int // established clients announce home server moves out
|
||||
multiForwarderCreated expvar.Int
|
||||
multiForwarderDeleted expvar.Int
|
||||
removePktForwardOther expvar.Int
|
||||
sclientWriteTimeouts expvar.Int
|
||||
avgQueueDuration *uint64 // In milliseconds; accessed atomically
|
||||
tcpRtt metrics.LabelMap // histogram
|
||||
meshUpdateBatchSize *metrics.Histogram
|
||||
meshUpdateLoopCount *metrics.Histogram
|
||||
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
|
||||
|
||||
// notHereCache is a server-wide cache of destination keys that are
|
||||
// not connected to this server. Shared across all client goroutines
|
||||
// to avoid repeated mutex-protected client map lookups.
|
||||
// Key: key.NodePublic, Value: time.Time (when entry was added).
|
||||
notHereCache sync.Map
|
||||
|
||||
// verifyClientsLocalTailscaled only accepts client connections to the DERP
|
||||
// server if the clientKey is a known peer in the network, as specified by a
|
||||
@ -667,6 +680,10 @@ func (s *Server) ModifyTLSConfigToAddMetaCert(c *tls.Config) {
|
||||
// observe EOFs/timeouts) but won't send them frames on the assumption
|
||||
// that they're dead.
|
||||
func (s *Server) registerClient(c *sclient) {
|
||||
// Invalidate not-here cache so other clients sending to this
|
||||
// key will do a fresh lookup and find the newly connected peer.
|
||||
s.notHereCache.Delete(c.key)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -866,15 +883,21 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.NodePublic) {
|
||||
|
||||
// requestPeerGoneWriteLimited sends a request to write a "peer gone"
|
||||
// frame, but only in reply to a disco packet, and only if we haven't
|
||||
// sent one recently.
|
||||
// already notified this client about this specific peer recently.
|
||||
// The caller must add to notHereCache separately. The peerGoneLim
|
||||
// provides a per-client safety backstop against clients sending to
|
||||
// many unique absent destinations.
|
||||
func (c *sclient) requestPeerGoneWriteLimited(peer key.NodePublic, contents []byte, reason derp.PeerGoneReasonType) {
|
||||
if disco.LooksLikeDiscoWrapper(contents) != true {
|
||||
if !disco.LooksLikeDiscoWrapper(contents) {
|
||||
return
|
||||
}
|
||||
|
||||
if c.peerGoneLim.Allow() {
|
||||
go c.requestPeerGoneWrite(peer, reason)
|
||||
// Per-client safety backstop.
|
||||
if !c.peerGoneLim.Allow() {
|
||||
return
|
||||
}
|
||||
|
||||
go c.requestPeerGoneWrite(peer, reason)
|
||||
}
|
||||
|
||||
func (s *Server) addWatcher(c *sclient) {
|
||||
@ -952,7 +975,7 @@ func (s *Server) accept(ctx context.Context, nc derp.Conn, brw *bufio.ReadWriter
|
||||
peerGone: make(chan peerGoneMsg),
|
||||
canMesh: s.isMeshPeer(clientInfo),
|
||||
isNotIdealConn: IdealNodeContextKey.Value(ctx) != "",
|
||||
peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3),
|
||||
peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 30),
|
||||
}
|
||||
|
||||
if c.canMesh {
|
||||
@ -1185,10 +1208,14 @@ func (c *sclient) handleFrameForwardPacket(ft derp.FrameType, fl uint32) error {
|
||||
func (c *sclient) handleFrameSendPacket(ft derp.FrameType, fl uint32) error {
|
||||
s := c.s
|
||||
|
||||
dstKey, contents, err := s.recvPacket(c.br, fl)
|
||||
dstKey, contents, err := s.recvPacket(c, fl)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
|
||||
}
|
||||
if contents == nil {
|
||||
// Packet was dropped early by the not-here cache.
|
||||
return nil
|
||||
}
|
||||
|
||||
var fwd PacketForwarder
|
||||
var dstLen int
|
||||
@ -1220,6 +1247,7 @@ func (c *sclient) handleFrameSendPacket(ft derp.FrameType, fl uint32) error {
|
||||
reason = dropReasonDupClient
|
||||
} else {
|
||||
c.requestPeerGoneWriteLimited(dstKey, contents, derp.PeerGoneReasonNotHere)
|
||||
s.addNotHereCache(dstKey, s.clock.Now())
|
||||
}
|
||||
s.recordDrop(contents, c.key, dstKey, reason)
|
||||
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
|
||||
@ -1559,19 +1587,34 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info
|
||||
return clientKey, info, nil
|
||||
}
|
||||
|
||||
func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
|
||||
// recvPacket reads a send-packet frame from br. If the destination key
|
||||
// is in the client's not-here cache, the payload is discarded without
|
||||
// allocation and contents is returned as nil.
|
||||
func (s *Server) recvPacket(c *sclient, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
|
||||
if frameLen < derp.KeyLen {
|
||||
return zpub, nil, errors.New("short send packet frame")
|
||||
}
|
||||
if err := dstKey.ReadRawWithoutAllocating(br); err != nil {
|
||||
if err := dstKey.ReadRawWithoutAllocating(c.br); err != nil {
|
||||
return zpub, nil, err
|
||||
}
|
||||
packetLen := frameLen - derp.KeyLen
|
||||
if packetLen > derp.MaxPacketSize {
|
||||
return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, derp.MaxPacketSize)
|
||||
}
|
||||
|
||||
// Check not-here cache before reading payload. This avoids
|
||||
// payload allocation, io.ReadFull, s.mu lock, and client map
|
||||
// lookup for repeated packets to absent destinations.
|
||||
if s.isNotHereCached(dstKey, s.clock.Now()) {
|
||||
if _, err := c.br.Discard(int(packetLen)); err != nil {
|
||||
return zpub, nil, err
|
||||
}
|
||||
s.packetsDroppedCachedNotHere.Add(1)
|
||||
return dstKey, nil, nil
|
||||
}
|
||||
|
||||
contents = make([]byte, packetLen)
|
||||
if _, err := io.ReadFull(br, contents); err != nil {
|
||||
if _, err := io.ReadFull(c.br, contents); err != nil {
|
||||
return zpub, nil, err
|
||||
}
|
||||
s.packetsRecv.Add(1)
|
||||
@ -1685,6 +1728,25 @@ func (c *sclient) presentFlags() derp.PeerPresentFlags {
|
||||
return f
|
||||
}
|
||||
|
||||
// isNotHereCached reports whether dstKey is in the server's not-here
|
||||
// cache and the entry has not expired.
|
||||
func (s *Server) isNotHereCached(dstKey key.NodePublic, now time.Time) bool {
|
||||
v, ok := s.notHereCache.Load(dstKey)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if now.Sub(v.(time.Time)) > notHereCacheTTL {
|
||||
s.notHereCache.Delete(dstKey)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// addNotHereCache records that dstKey is not connected to this server.
|
||||
func (s *Server) addNotHereCache(dstKey key.NodePublic, now time.Time) {
|
||||
s.notHereCache.Store(dstKey, now)
|
||||
}
|
||||
|
||||
// peerConnState represents whether a peer is connected to the server
|
||||
// or not.
|
||||
type peerConnState struct {
|
||||
@ -2233,6 +2295,7 @@ func (s *Server) ExpVar() expvar.Var {
|
||||
m.Set("sent_pong", &s.sentPong)
|
||||
m.Set("peer_gone_disconnected_frames", &s.peerGoneDisconnectedFrames)
|
||||
m.Set("peer_gone_not_here_frames", &s.peerGoneNotHereFrames)
|
||||
m.Set("packets_dropped_cached_not_here", &s.packetsDroppedCachedNotHere)
|
||||
m.Set("packets_forwarded_out", &s.packetsForwardedOut)
|
||||
m.Set("packets_forwarded_in", &s.packetsForwardedIn)
|
||||
m.Set("multiforwarder_created", &s.multiForwarderCreated)
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
"tailscale.com/derp"
|
||||
"tailscale.com/derp/derpconst"
|
||||
tsrate "tailscale.com/tstime/rate"
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
@ -680,12 +681,114 @@ func BenchmarkConcurrentStreams(b *testing.B) {
|
||||
<-acceptDone
|
||||
}
|
||||
|
||||
// loopReader is an io.Reader that repeats data infinitely.
|
||||
type loopReader struct {
|
||||
data []byte
|
||||
off int
|
||||
}
|
||||
|
||||
func (r *loopReader) Read(p []byte) (int, error) {
|
||||
n := 0
|
||||
for n < len(p) {
|
||||
copied := copy(p[n:], r.data[r.off:])
|
||||
n += copied
|
||||
r.off += copied
|
||||
if r.off >= len(r.data) {
|
||||
r.off = 0
|
||||
}
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func BenchmarkSendRecv(b *testing.B) {
|
||||
for _, size := range []int{10, 100, 1000, 10000} {
|
||||
b.Run(fmt.Sprintf("msgsize=%d", size), func(b *testing.B) { benchmarkSendRecvSize(b, size) })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// BenchmarkHandleFrameSendPacketAbsent benchmarks the server-side
|
||||
// handleFrameSendPacket path directly, bypassing TLS and TCP to
|
||||
// isolate server processing cost. It compares cache hits (same absent
|
||||
// destination) vs cache misses (rotating through many absent destinations).
|
||||
func BenchmarkHandleFrameSendPacketAbsent(b *testing.B) {
|
||||
const payloadSize = 100
|
||||
const frameSize = derp.KeyLen + payloadSize
|
||||
|
||||
setup := func(b *testing.B) *sclient {
|
||||
b.Helper()
|
||||
s := New(key.NewNode(), logger.Discard)
|
||||
b.Cleanup(func() { s.Close() })
|
||||
c := &sclient{
|
||||
s: s,
|
||||
key: key.NewNode().Public(),
|
||||
logf: logger.Discard,
|
||||
done: make(chan struct{}),
|
||||
peerGone: make(chan peerGoneMsg, 100),
|
||||
peerGoneLim: tsrate.NewLimiter(tsrate.Every(time.Second/50), 50),
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// buildFrames builds a byte buffer containing n frame payloads.
|
||||
// If sameKey, all frames use the same destination key (cache hit
|
||||
// after first). Otherwise, each frame uses a distinct key (cache miss).
|
||||
buildFrames := func(n int, sameKey bool) []byte {
|
||||
buf := make([]byte, n*frameSize)
|
||||
var fixedKey []byte
|
||||
if sameKey {
|
||||
fixedKey = key.NewNode().Public().AppendTo(nil)
|
||||
}
|
||||
for i := range n {
|
||||
off := i * frameSize
|
||||
if sameKey {
|
||||
copy(buf[off:], fixedKey)
|
||||
} else {
|
||||
k := key.NewNode().Public()
|
||||
copy(buf[off:], k.AppendTo(nil))
|
||||
}
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
// same_key: all packets to same absent peer (cache hit after first).
|
||||
b.Run("same_key", func(b *testing.B) {
|
||||
c := setup(b)
|
||||
frame := buildFrames(1, true)
|
||||
// Use an infinite reader that repeats the same frame.
|
||||
c.br = bufio.NewReader(&loopReader{data: frame})
|
||||
|
||||
b.SetBytes(int64(payloadSize))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for range b.N {
|
||||
if err := c.handleFrameSendPacket(derp.FrameSendPacket, frameSize); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// unique_keys: each packet to a different absent peer (always cache miss).
|
||||
// Uses a pool of 1000 keys cycling, each seen once per 1000 iterations.
|
||||
// With 5s TTL the cache entries from earlier cycles are still valid,
|
||||
// so this measures cache-miss-then-hit pattern.
|
||||
b.Run("unique_keys", func(b *testing.B) {
|
||||
const numKeys = 1000
|
||||
c := setup(b)
|
||||
pool := buildFrames(numKeys, false)
|
||||
c.br = bufio.NewReader(&loopReader{data: pool})
|
||||
|
||||
b.SetBytes(int64(payloadSize))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for range b.N {
|
||||
if err := c.handleFrameSendPacket(derp.FrameSendPacket, frameSize); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkSendRecvSize(b *testing.B, packetSize int) {
|
||||
serverPrivateKey := key.NewNode()
|
||||
s := New(serverPrivateKey, logger.Discard)
|
||||
@ -973,3 +1076,102 @@ func BenchmarkSenderCardinalityOverhead(b *testing.B) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNotHereCache(t *testing.T) {
|
||||
s := New(key.NewNode(), t.Logf)
|
||||
defer s.Close()
|
||||
|
||||
absentKey := key.NewNode().Public()
|
||||
now := time.Now()
|
||||
|
||||
// Initially not cached.
|
||||
if s.isNotHereCached(absentKey, now) {
|
||||
t.Fatal("expected not cached initially")
|
||||
}
|
||||
|
||||
// Add to cache.
|
||||
s.addNotHereCache(absentKey, now)
|
||||
if !s.isNotHereCached(absentKey, now) {
|
||||
t.Fatal("expected cached after add")
|
||||
}
|
||||
|
||||
// Still cached just before TTL expires.
|
||||
if !s.isNotHereCached(absentKey, now.Add(notHereCacheTTL-time.Millisecond)) {
|
||||
t.Fatal("expected cached just before TTL")
|
||||
}
|
||||
|
||||
// Expired after TTL.
|
||||
if s.isNotHereCached(absentKey, now.Add(notHereCacheTTL+time.Millisecond)) {
|
||||
t.Fatal("expected expired after TTL")
|
||||
}
|
||||
|
||||
// Re-add and verify invalidation on registerClient.
|
||||
s.addNotHereCache(absentKey, time.Now())
|
||||
if !s.isNotHereCached(absentKey, time.Now()) {
|
||||
t.Fatal("expected cached after re-add")
|
||||
}
|
||||
|
||||
c := &sclient{
|
||||
s: s,
|
||||
key: absentKey,
|
||||
logf: logger.WithPrefix(t.Logf, "test: "),
|
||||
done: make(chan struct{}),
|
||||
peerGone: make(chan peerGoneMsg),
|
||||
sendQueue: make(chan pkt, 1),
|
||||
discoSendQueue: make(chan pkt, 1),
|
||||
sendPongCh: make(chan [8]byte, 1),
|
||||
}
|
||||
s.registerClient(c)
|
||||
defer s.unregisterClient(c)
|
||||
|
||||
if s.isNotHereCached(absentKey, time.Now()) {
|
||||
t.Fatal("expected cache invalidated after registerClient")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNotHereCacheRecvPacket(t *testing.T) {
|
||||
s := New(key.NewNode(), t.Logf)
|
||||
defer s.Close()
|
||||
|
||||
absentKey := key.NewNode().Public()
|
||||
|
||||
// Build a frame payload: 32-byte dest key + 100-byte payload.
|
||||
payload := make([]byte, derp.KeyLen+100)
|
||||
copy(payload[:derp.KeyLen], absentKey.AppendTo(nil))
|
||||
|
||||
c := &sclient{
|
||||
s: s,
|
||||
key: key.NewNode().Public(),
|
||||
logf: logger.WithPrefix(t.Logf, "test: "),
|
||||
done: make(chan struct{}),
|
||||
peerGone: make(chan peerGoneMsg, 100),
|
||||
peerGoneLim: tsrate.NewLimiter(tsrate.Every(time.Second/50), 50),
|
||||
}
|
||||
|
||||
// First call: cache miss, should return contents.
|
||||
c.br = bufio.NewReader(&loopReader{data: payload})
|
||||
_, contents, err := s.recvPacket(c, uint32(len(payload)))
|
||||
if err != nil {
|
||||
t.Fatalf("recvPacket (miss): %v", err)
|
||||
}
|
||||
if contents == nil {
|
||||
t.Fatal("expected non-nil contents on cache miss")
|
||||
}
|
||||
|
||||
// Populate the cache as handleFrameSendPacket would.
|
||||
s.addNotHereCache(absentKey, s.clock.Now())
|
||||
|
||||
// Second call: cache hit, should return nil contents.
|
||||
_, contents, err = s.recvPacket(c, uint32(len(payload)))
|
||||
if err != nil {
|
||||
t.Fatalf("recvPacket (hit): %v", err)
|
||||
}
|
||||
if contents != nil {
|
||||
t.Fatal("expected nil contents on cache hit")
|
||||
}
|
||||
|
||||
// Verify metric incremented.
|
||||
if got := s.packetsDroppedCachedNotHere.Value(); got != 1 {
|
||||
t.Fatalf("packetsDroppedCachedNotHere = %d, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user