mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-23 10:12:07 +01:00
net/udprelay: per packet batch example
Signed-off-by: Jordan Whited <jordan@tailscale.com>
This commit is contained in:
parent
8476aa8dd5
commit
dad2181fd5
@ -5,7 +5,6 @@ package udprelay
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"expvar"
|
"expvar"
|
||||||
"net/netip"
|
|
||||||
|
|
||||||
"tailscale.com/util/clientmetric"
|
"tailscale.com/util/clientmetric"
|
||||||
"tailscale.com/util/usermetric"
|
"tailscale.com/util/usermetric"
|
||||||
@ -103,28 +102,26 @@ func (m *metrics) addEndpoints(value int) {
|
|||||||
metricEndpoints.Add(int64(value))
|
metricEndpoints.Add(int64(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metrics) countForwarded(from, to netip.Addr, b []byte) {
|
func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) {
|
||||||
in4, out4 := from.Is4(), to.Is4()
|
|
||||||
bytes := int64(len(b))
|
|
||||||
if in4 && out4 {
|
if in4 && out4 {
|
||||||
m.forwarded44Packets.Add(1)
|
m.forwarded44Packets.Add(packets)
|
||||||
m.forwarded44Bytes.Add(bytes)
|
m.forwarded44Bytes.Add(bytes)
|
||||||
metricForwarded44Packets.Add(1)
|
metricForwarded44Packets.Add(packets)
|
||||||
metricForwarded44Bytes.Add(bytes)
|
metricForwarded44Bytes.Add(bytes)
|
||||||
} else if in4 && !out4 {
|
} else if in4 && !out4 {
|
||||||
m.forwarded46Packets.Add(1)
|
m.forwarded46Packets.Add(packets)
|
||||||
m.forwarded46Bytes.Add(bytes)
|
m.forwarded46Bytes.Add(bytes)
|
||||||
metricForwarded46Packets.Add(1)
|
metricForwarded46Packets.Add(packets)
|
||||||
metricForwarded46Bytes.Add(bytes)
|
metricForwarded46Bytes.Add(bytes)
|
||||||
} else if !in4 && out4 {
|
} else if !in4 && out4 {
|
||||||
m.forwarded64Packets.Add(1)
|
m.forwarded64Packets.Add(packets)
|
||||||
m.forwarded64Bytes.Add(bytes)
|
m.forwarded64Bytes.Add(bytes)
|
||||||
metricForwarded64Packets.Add(1)
|
metricForwarded64Packets.Add(packets)
|
||||||
metricForwarded64Bytes.Add(bytes)
|
metricForwarded64Bytes.Add(bytes)
|
||||||
} else {
|
} else {
|
||||||
m.forwarded66Packets.Add(1)
|
m.forwarded66Packets.Add(packets)
|
||||||
m.forwarded66Bytes.Add(bytes)
|
m.forwarded66Bytes.Add(bytes)
|
||||||
metricForwarded66Packets.Add(1)
|
metricForwarded66Packets.Add(packets)
|
||||||
metricForwarded66Bytes.Add(bytes)
|
metricForwarded66Bytes.Add(bytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -690,39 +690,39 @@ func (s *Server) endpointGCLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort) {
|
func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to netip.AddrPort, isDataPacket bool) {
|
||||||
if stun.Is(b) && b[1] == 0x01 {
|
if stun.Is(b) && b[1] == 0x01 {
|
||||||
// A b[1] value of 0x01 (STUN method binding) is sufficiently
|
// A b[1] value of 0x01 (STUN method binding) is sufficiently
|
||||||
// non-overlapping with the Geneve header where the LSB is always 0
|
// non-overlapping with the Geneve header where the LSB is always 0
|
||||||
// (part of 6 "reserved" bits).
|
// (part of 6 "reserved" bits).
|
||||||
s.netChecker.ReceiveSTUNPacket(b, from)
|
s.netChecker.ReceiveSTUNPacket(b, from)
|
||||||
return nil, netip.AddrPort{}
|
return nil, netip.AddrPort{}, false
|
||||||
}
|
}
|
||||||
gh := packet.GeneveHeader{}
|
gh := packet.GeneveHeader{}
|
||||||
err := gh.Decode(b)
|
err := gh.Decode(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, netip.AddrPort{}
|
return nil, netip.AddrPort{}, false
|
||||||
}
|
}
|
||||||
e, ok := s.serverEndpointByVNI.Load(gh.VNI.Get())
|
e, ok := s.serverEndpointByVNI.Load(gh.VNI.Get())
|
||||||
if !ok {
|
if !ok {
|
||||||
// unknown VNI
|
// unknown VNI
|
||||||
return nil, netip.AddrPort{}
|
return nil, netip.AddrPort{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
now := mono.Now()
|
now := mono.Now()
|
||||||
if gh.Control {
|
if gh.Control {
|
||||||
if gh.Protocol != packet.GeneveProtocolDisco {
|
if gh.Protocol != packet.GeneveProtocolDisco {
|
||||||
// control packet, but not Disco
|
// control packet, but not Disco
|
||||||
return nil, netip.AddrPort{}
|
return nil, netip.AddrPort{}, false
|
||||||
}
|
}
|
||||||
msg := b[packet.GeneveFixedHeaderLength:]
|
msg := b[packet.GeneveFixedHeaderLength:]
|
||||||
secrets := s.getMACSecrets(now)
|
secrets := s.getMACSecrets(now)
|
||||||
return e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now)
|
write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now)
|
||||||
|
isDataPacket = false
|
||||||
|
return
|
||||||
}
|
}
|
||||||
write, to = e.(*serverEndpoint).handleDataPacket(from, b, now)
|
write, to = e.(*serverEndpoint).handleDataPacket(from, b, now)
|
||||||
if write != nil {
|
isDataPacket = true
|
||||||
s.metrics.countForwarded(from.Addr(), to.Addr(), write)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -791,16 +791,30 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type forwardedDataMetrics struct {
|
||||||
|
bytes int64
|
||||||
|
packets int64
|
||||||
|
}
|
||||||
|
dataMetricsByOutAF := [2]forwardedDataMetrics{} // 0 = ipv4; 1 = ipv6
|
||||||
for _, msg := range msgs[:n] {
|
for _, msg := range msgs[:n] {
|
||||||
if msg.N == 0 {
|
if msg.N == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
buf := msg.Buffers[0][:msg.N]
|
buf := msg.Buffers[0][:msg.N]
|
||||||
from := msg.Addr.(*net.UDPAddr).AddrPort()
|
from := msg.Addr.(*net.UDPAddr).AddrPort()
|
||||||
write, to := s.handlePacket(from, buf)
|
write, to, isDataPacket := s.handlePacket(from, buf)
|
||||||
if !to.IsValid() {
|
if !to.IsValid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if isDataPacket {
|
||||||
|
if to.Addr().Is4() {
|
||||||
|
dataMetricsByOutAF[0].bytes += int64(len(write))
|
||||||
|
dataMetricsByOutAF[0].packets++
|
||||||
|
} else {
|
||||||
|
dataMetricsByOutAF[1].bytes += int64(len(write))
|
||||||
|
dataMetricsByOutAF[1].packets++
|
||||||
|
}
|
||||||
|
}
|
||||||
if from.Addr().Is4() == to.Addr().Is4() || otherSocket != nil {
|
if from.Addr().Is4() == to.Addr().Is4() || otherSocket != nil {
|
||||||
buffs, ok := writeBuffsByDest[to]
|
buffs, ok := writeBuffsByDest[to]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -831,6 +845,9 @@ func (s *Server) packetReadLoop(readFromSocket, otherSocket batching.Conn, readF
|
|||||||
}
|
}
|
||||||
delete(writeBuffsByDest, dest)
|
delete(writeBuffsByDest, dest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.metrics.countForwarded(readFromSocketIsIPv4, false, dataMetricsByOutAF[0].bytes, dataMetricsByOutAF[0].packets)
|
||||||
|
s.metrics.countForwarded(readFromSocketIsIPv4, true, dataMetricsByOutAF[1].bytes, dataMetricsByOutAF[1].packets)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user