From dad2181fd546e25933ea5105a72a0986a08dc0ec Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Wed, 17 Dec 2025 12:57:16 -0800 Subject: [PATCH] net/udprelay: per packet batch example Signed-off-by: Jordan Whited --- net/udprelay/metrics.go | 21 +++++++++------------ net/udprelay/server.go | 37 +++++++++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/net/udprelay/metrics.go b/net/udprelay/metrics.go index 3bd12edd6..d36a74d9a 100644 --- a/net/udprelay/metrics.go +++ b/net/udprelay/metrics.go @@ -5,7 +5,6 @@ package udprelay import ( "expvar" - "net/netip" "tailscale.com/util/clientmetric" "tailscale.com/util/usermetric" @@ -103,28 +102,26 @@ func (m *metrics) addEndpoints(value int) { metricEndpoints.Add(int64(value)) } -func (m *metrics) countForwarded(from, to netip.Addr, b []byte) { - in4, out4 := from.Is4(), to.Is4() - bytes := int64(len(b)) +func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) { if in4 && out4 { - m.forwarded44Packets.Add(1) + m.forwarded44Packets.Add(packets) m.forwarded44Bytes.Add(bytes) - metricForwarded44Packets.Add(1) + metricForwarded44Packets.Add(packets) metricForwarded44Bytes.Add(bytes) } else if in4 && !out4 { - m.forwarded46Packets.Add(1) + m.forwarded46Packets.Add(packets) m.forwarded46Bytes.Add(bytes) - metricForwarded46Packets.Add(1) + metricForwarded46Packets.Add(packets) metricForwarded46Bytes.Add(bytes) } else if !in4 && out4 { - m.forwarded64Packets.Add(1) + m.forwarded64Packets.Add(packets) m.forwarded64Bytes.Add(bytes) - metricForwarded64Packets.Add(1) + metricForwarded64Packets.Add(packets) metricForwarded64Bytes.Add(bytes) } else { - m.forwarded66Packets.Add(1) + m.forwarded66Packets.Add(packets) m.forwarded66Bytes.Add(bytes) - metricForwarded66Packets.Add(1) + metricForwarded66Packets.Add(packets) metricForwarded66Bytes.Add(bytes) } } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index b967b8fce..c0b0a980b 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -690,39 +690,39 @@ func (s *Server) endpointGCLoop() { } } -func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort) { +func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort, isDataPacket bool) { if stun.Is(b) && b[1] == 0x01 { // A b[1] value of 0x01 (STUN method binding) is sufficiently // non-overlapping with the Geneve header where the LSB is always 0 // (part of 6 "reserved" bits). s.netChecker.ReceiveSTUNPacket(b, from) - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } gh := packet.GeneveHeader{} err := gh.Decode(b) if err != nil { - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } e, ok := s.serverEndpointByVNI.Load(gh.VNI.Get()) if !ok { // unknown VNI - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } now := mono.Now() if gh.Control { if gh.Protocol != packet.GeneveProtocolDisco { // control packet, but not Disco - return nil, netip.AddrPort{} + return nil, netip.AddrPort{}, false } msg := b[packet.GeneveFixedHeaderLength:] secrets := s.getMACSecrets(now) - return e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now) + write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now) + isDataPacket = false + return } write, to = e.(*serverEndpoint).handleDataPacket(from, b, now) - if write != nil { - s.metrics.countForwarded(from.Addr(), to.Addr(), write) - } + isDataPacket = true return } @@ -791,16 +791,30 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF return } + type forwardedDataMetrics struct { + bytes int64 + packets int64 + } + dataMetricsByOutAF := [2]forwardedDataMetrics{} // 0 = ipv4; 1 = ipv6 for _, msg := range msgs[:n] { if msg.N == 0 { continue } buf := msg.Buffers[0][:msg.N] from := msg.Addr.(*net.UDPAddr).AddrPort() - write, to := s.handlePacket(from, buf) + write, to, isDataPacket := s.handlePacket(from, buf) if !to.IsValid() { continue } + if isDataPacket { + if to.Addr().Is4() { + dataMetricsByOutAF[0].bytes += int64(len(write)) + dataMetricsByOutAF[0].packets++ + } else { + dataMetricsByOutAF[1].bytes += int64(len(write)) + dataMetricsByOutAF[1].packets++ + } + } if from.Addr().Is4() == to.Addr().Is4() || otherSocket != nil { buffs, ok := writeBuffsByDest[to] if !ok { @@ -831,6 +845,9 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF } delete(writeBuffsByDest, dest) } + + s.metrics.countForwarded(readFromSocketIsIPv4, false, dataMetricsByOutAF[0].bytes, dataMetricsByOutAF[0].packets) + s.metrics.countForwarded(readFromSocketIsIPv4, true, dataMetricsByOutAF[1].bytes, dataMetricsByOutAF[1].packets) } }