From 8476aa8dd5304b5cd97ee8869d9120588feea313 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Mon, 15 Dec 2025 19:30:24 -0800 Subject: [PATCH] net/udprelay: expose peer relay metrics Adding both user and client metrics for peer relay forwarded bytes and packets, and the total endpoints gauge. User metrics: tailscaled_relay_forwarded_packets_total{trainsport_in, transport_out} tailscaled_relay_forwarded_bytes_total{trainsport_in, transport_out} tailscaled_relay_endpoints_total{} Where the transport labels can be of "udp4" or "udp6". Client metrics: udprelay_forwarded_(packets|bytes)_udp(4|6)_udp(4|6) udprelay_endpoints RELNOTE: Expose tailscaled metrics for peer relay. Updates tailscale/corp#30820 Change-Id: I1a905d15bdc5ee84e28017e0b93210e2d9660259 Signed-off-by: Alex Valiushko --- feature/relayserver/relayserver.go | 2 +- net/udprelay/metrics.go | 130 +++++++++++++++++++++++++++ net/udprelay/metrics_test.go | 140 +++++++++++++++++++++++++++++ net/udprelay/server.go | 13 ++- net/udprelay/server_test.go | 10 ++- 5 files changed, 291 insertions(+), 4 deletions(-) create mode 100644 net/udprelay/metrics.go create mode 100644 net/udprelay/metrics_test.go diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index 4f23ae18e..b29a6abed 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -70,7 +70,7 @@ func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r * func newExtension(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, error) { e := &extension{ newServerFn: func(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (relayServer, error) { - return udprelay.NewServer(logf, port, onlyStaticAddrPorts) + return udprelay.NewServer(logf, port, onlyStaticAddrPorts, sb.Sys().UserMetricsRegistry()) }, logf: logger.WithPrefix(logf, featureName+": "), } diff --git a/net/udprelay/metrics.go b/net/udprelay/metrics.go new file mode 100644 index 000000000..3bd12edd6 --- /dev/null +++ b/net/udprelay/metrics.go @@ -0,0 +1,130 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package udprelay + +import ( + "expvar" + "net/netip" + + "tailscale.com/util/clientmetric" + "tailscale.com/util/usermetric" +) + +var ( + metricForwarded44Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp4_udp4") + metricForwarded46Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp4_udp6") + metricForwarded64Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp6_udp4") + metricForwarded66Packets = clientmetric.NewCounter("udprelay_forwarded_packets_udp6_udp6") + + metricForwarded44Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp4_udp4") + metricForwarded46Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp4_udp6") + metricForwarded64Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp6_udp4") + metricForwarded66Bytes = clientmetric.NewCounter("udprelay_forwarded_bytes_udp6_udp6") + + metricEndpoints = clientmetric.NewGauge("udprelay_endpoints") +) + +type transport string + +const ( + transportUDP4 transport = "udp4" + transportUDP6 transport = "udp6" +) + +type forwardedLabel struct { + transportIn transport `prom:"transport_in"` + transportOut transport `prom:"transport_out"` +} + +type endpointLabel struct { +} + +type metrics struct { + forwarded44Packets expvar.Int + forwarded46Packets expvar.Int + forwarded64Packets expvar.Int + forwarded66Packets expvar.Int + + forwarded44Bytes expvar.Int + forwarded46Bytes expvar.Int + forwarded64Bytes expvar.Int + forwarded66Bytes expvar.Int + + endpoints expvar.Int +} + +// registerMetrics publishes user metric counters for peer relay server. +func registerMetrics(reg *usermetric.Registry) *metrics { + var ( + forwardedPackets = usermetric.NewMultiLabelMapWithRegistry[forwardedLabel]( + reg, + "tailscaled_relay_forwarded_packets_total", + "counter", + "Counts the number of packets forwarded via Peer Relay", + ) + forwardedBytes = usermetric.NewMultiLabelMapWithRegistry[forwardedLabel]( + reg, + "tailscaled_relay_forwarded_bytes_total", + "counter", + "Counts the number of bytes forwarded via Peer Relay", + ) + endpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel]( + reg, + "tailscaled_relay_endpoints_total", + "gauge", + "Renders the current number of registered Peer Relay endpoints", + ) + forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4} + forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6} + forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4} + forwarded66 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP6} + m = new(metrics) + ) + + // Publish user metrics. + forwardedPackets.Set(forwarded44, &m.forwarded44Packets) + forwardedPackets.Set(forwarded46, &m.forwarded46Packets) + forwardedPackets.Set(forwarded64, &m.forwarded64Packets) + forwardedPackets.Set(forwarded66, &m.forwarded66Packets) + + forwardedBytes.Set(forwarded44, &m.forwarded44Bytes) + forwardedBytes.Set(forwarded46, &m.forwarded46Bytes) + forwardedBytes.Set(forwarded64, &m.forwarded64Bytes) + forwardedBytes.Set(forwarded66, &m.forwarded66Bytes) + + endpoints.Set(endpointLabel{}, &m.endpoints) + + return m +} + +func (m *metrics) addEndpoints(value int) { + m.endpoints.Add(int64(value)) + metricEndpoints.Add(int64(value)) +} + +func (m *metrics) countForwarded(from, to netip.Addr, b []byte) { + in4, out4 := from.Is4(), to.Is4() + bytes := int64(len(b)) + if in4 && out4 { + m.forwarded44Packets.Add(1) + m.forwarded44Bytes.Add(bytes) + metricForwarded44Packets.Add(1) + metricForwarded44Bytes.Add(bytes) + } else if in4 && !out4 { + m.forwarded46Packets.Add(1) + m.forwarded46Bytes.Add(bytes) + metricForwarded46Packets.Add(1) + metricForwarded46Bytes.Add(bytes) + } else if !in4 && out4 { + m.forwarded64Packets.Add(1) + m.forwarded64Bytes.Add(bytes) + metricForwarded64Packets.Add(1) + metricForwarded64Bytes.Add(bytes) + } else { + m.forwarded66Packets.Add(1) + m.forwarded66Bytes.Add(bytes) + metricForwarded66Packets.Add(1) + metricForwarded66Bytes.Add(bytes) + } +} diff --git a/net/udprelay/metrics_test.go b/net/udprelay/metrics_test.go new file mode 100644 index 000000000..6ea0dbb28 --- /dev/null +++ b/net/udprelay/metrics_test.go @@ -0,0 +1,140 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package udprelay + +import ( + "net/netip" + "slices" + "testing" + + qt "github.com/frankban/quicktest" + "tailscale.com/util/usermetric" +) + +func resetClientMetrics() { + // clientmetrics are global and must be reset between test cases + // for the assertMetricsMatch to work. + metricForwarded44Packets.Set(0) + metricForwarded46Packets.Set(0) + metricForwarded64Packets.Set(0) + metricForwarded66Packets.Set(0) + metricForwarded44Bytes.Set(0) + metricForwarded46Bytes.Set(0) + metricForwarded64Bytes.Set(0) + metricForwarded66Bytes.Set(0) + metricEndpoints.Set(0) +} + +func assertMetricsMatch(t *testing.T, s *Server) { + t.Helper() + s.mu.Lock() + defer s.mu.Unlock() + c := qt.New(t) + var ( + ps44, ps46, ps64, ps66 uint64 + bs44, bs46, bs64, bs66 uint64 + + es = len(s.serverEndpointByDisco) + ) + for _, e := range s.serverEndpointByDisco { + cs := e.extractClientInfo() + a, b := cs[0], cs[1] + a4, b4 := a.Endpoint.Addr().Is4(), b.Endpoint.Addr().Is4() + if a4 && b4 { + ps44 += b.PacketsTx + ps44 += a.PacketsTx + bs44 += b.BytesTx + bs44 += a.BytesTx + } else if a4 && !b4 { + ps46 += b.PacketsTx + ps64 += a.PacketsTx + bs46 += b.BytesTx + bs64 += a.BytesTx + } else if !a4 && b4 { + ps64 += b.PacketsTx + ps46 += a.PacketsTx + bs64 += b.BytesTx + bs46 += a.BytesTx + } else if !a4 && !b4 { + ps66 += b.PacketsTx + ps66 += a.PacketsTx + bs66 += b.BytesTx + bs66 += a.BytesTx + } + } + c.Assert(s.metrics.forwarded44Packets.Value(), qt.Equals, int64(ps44)) + c.Assert(s.metrics.forwarded46Packets.Value(), qt.Equals, int64(ps46)) + c.Assert(s.metrics.forwarded64Packets.Value(), qt.Equals, int64(ps64)) + c.Assert(s.metrics.forwarded66Packets.Value(), qt.Equals, int64(ps66)) + c.Assert(s.metrics.forwarded44Bytes.Value(), qt.Equals, int64(bs44)) + c.Assert(s.metrics.forwarded46Bytes.Value(), qt.Equals, int64(bs46)) + c.Assert(s.metrics.forwarded64Bytes.Value(), qt.Equals, int64(bs64)) + c.Assert(s.metrics.forwarded66Bytes.Value(), qt.Equals, int64(bs66)) + c.Assert(s.metrics.endpoints.Value(), qt.Equals, int64(es)) + + c.Assert(metricForwarded44Packets.Value(), qt.Equals, int64(ps44)) + c.Assert(metricForwarded46Packets.Value(), qt.Equals, int64(ps46)) + c.Assert(metricForwarded64Packets.Value(), qt.Equals, int64(ps64)) + c.Assert(metricForwarded66Packets.Value(), qt.Equals, int64(ps66)) + c.Assert(metricForwarded44Bytes.Value(), qt.Equals, int64(bs44)) + c.Assert(metricForwarded46Bytes.Value(), qt.Equals, int64(bs46)) + c.Assert(metricForwarded64Bytes.Value(), qt.Equals, int64(bs64)) + c.Assert(metricForwarded66Bytes.Value(), qt.Equals, int64(bs66)) + c.Assert(metricEndpoints.Value(), qt.Equals, int64(es)) +} + +func TestMetrics(t *testing.T) { + c := qt.New(t) + resetClientMetrics() + r := &usermetric.Registry{} + m := registerMetrics(r) + + // Expect certain prom names registered. + have := r.MetricNames() + want := []string{ + "tailscaled_relay_forwarded_packets_total", + "tailscaled_relay_forwarded_bytes_total", + "tailscaled_relay_endpoints_total", + } + slices.Sort(have) + slices.Sort(want) + c.Assert(have, qt.CmpEquals(), want) + + // Validate addEndpoints. + m.addEndpoints(1) + c.Assert(m.endpoints.Value(), qt.Equals, int64(1)) + c.Assert(metricEndpoints.Value(), qt.Equals, int64(1)) + m.addEndpoints(-1) + c.Assert(m.endpoints.Value(), qt.Equals, int64(0)) + c.Assert(metricEndpoints.Value(), qt.Equals, int64(0)) + + // Validate countForwarded. + var ( + ip4 = netip.AddrFrom4([4]byte{1, 1, 1, 1}) + ip6 = netip.AddrFrom16([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) + ) + m.countForwarded(ip4, ip4, []byte{1}) + c.Assert(m.forwarded44Bytes.Value(), qt.Equals, int64(1)) + c.Assert(m.forwarded44Packets.Value(), qt.Equals, int64(1)) + c.Assert(metricForwarded44Bytes.Value(), qt.Equals, int64(1)) + c.Assert(metricForwarded44Packets.Value(), qt.Equals, int64(1)) + + m.countForwarded(ip4, ip6, []byte{1, 2}) + c.Assert(m.forwarded46Bytes.Value(), qt.Equals, int64(2)) + c.Assert(m.forwarded46Packets.Value(), qt.Equals, int64(1)) + c.Assert(metricForwarded46Bytes.Value(), qt.Equals, int64(2)) + c.Assert(metricForwarded46Packets.Value(), qt.Equals, int64(1)) + + m.countForwarded(ip6, ip4, []byte{1, 2, 3}) + c.Assert(m.forwarded64Bytes.Value(), qt.Equals, int64(3)) + c.Assert(m.forwarded64Packets.Value(), qt.Equals, int64(1)) + c.Assert(metricForwarded64Bytes.Value(), qt.Equals, int64(3)) + c.Assert(metricForwarded64Packets.Value(), qt.Equals, int64(1)) + + m.countForwarded(ip6, ip6, []byte{1, 2, 3, 4}) + c.Assert(m.forwarded66Bytes.Value(), qt.Equals, int64(4)) + c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(1)) + c.Assert(metricForwarded66Bytes.Value(), qt.Equals, int64(4)) + c.Assert(metricForwarded66Packets.Value(), qt.Equals, int64(1)) +} diff --git a/net/udprelay/server.go b/net/udprelay/server.go index 45127dfae..b967b8fce 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -43,6 +43,7 @@ import ( "tailscale.com/types/views" "tailscale.com/util/eventbus" "tailscale.com/util/set" + "tailscale.com/util/usermetric" ) const ( @@ -76,6 +77,7 @@ type Server struct { wg sync.WaitGroup closeCh chan struct{} netChecker *netcheck.Client + metrics *metrics mu sync.Mutex // guards the following fields macSecrets views.Slice[[blake2s.Size]byte] // [0] is most recent, max 2 elements @@ -321,7 +323,7 @@ func (e *serverEndpoint) isBoundLocked() bool { // onlyStaticAddrPorts is true, then dynamic addr:port discovery will be // disabled, and only addr:port's set via [Server.SetStaticAddrPorts] will be // used. -func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Server, err error) { +func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool, metrics *usermetric.Registry) (s *Server, err error) { s = &Server{ logf: logf, disco: key.NewDisco(), @@ -333,6 +335,7 @@ func NewServer(logf logger.Logf, port uint16, onlyStaticAddrPorts bool) (s *Serv nextVNI: minVNI, } s.discoPublic = s.disco.Public() + s.metrics = registerMetrics(metrics) // TODO(creachadair): Find a way to plumb this in during initialization. // As-written, messages published here will not be seen by other components @@ -670,6 +673,7 @@ func (s *Server) endpointGCLoop() { defer s.mu.Unlock() for k, v := range s.serverEndpointByDisco { if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) { + s.metrics.addEndpoints(-1) delete(s.serverEndpointByDisco, k) s.serverEndpointByVNI.Delete(v.vni) } @@ -715,7 +719,11 @@ func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to n secrets := s.getMACSecrets(now) return e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now) } - return e.(*serverEndpoint).handleDataPacket(from, b, now) + write, to = e.(*serverEndpoint).handleDataPacket(from, b, now) + if write != nil { + s.metrics.countForwarded(from.Addr(), to.Addr(), write) + } + return } func (s *Server) getMACSecrets(now mono.Time) views.Slice[[blake2s.Size]byte] { @@ -932,6 +940,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv s.serverEndpointByVNI.Store(e.vni, e) s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString()) + s.metrics.addEndpoints(1) return endpoint.ServerEndpoint{ ServerDisco: s.discoPublic, ClientDisco: pair.Get(), diff --git a/net/udprelay/server_test.go b/net/udprelay/server_test.go index c4b365641..d90bd50ce 100644 --- a/net/udprelay/server_test.go +++ b/net/udprelay/server_test.go @@ -21,6 +21,7 @@ import ( "tailscale.com/tstime/mono" "tailscale.com/types/key" "tailscale.com/types/views" + "tailscale.com/util/usermetric" ) type testClient struct { @@ -209,7 +210,9 @@ func TestServer(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - server, err := NewServer(t.Logf, 0, true) + reg := new(usermetric.Registry) + resetClientMetrics() + server, err := NewServer(t.Logf, 0, true, reg) if err != nil { t.Fatal(err) } @@ -234,6 +237,7 @@ func TestServer(t *testing.T) { } // We expect the same endpoint details pre-handshake. + assertMetricsMatch(t, server) if diff := cmp.Diff(dupEndpoint, endpoint, cmpopts.EquateComparable(netip.AddrPort{}, key.DiscoPublic{})); diff != "" { t.Fatalf("wrong dupEndpoint (-got +want)\n%s", diff) } @@ -285,6 +289,7 @@ func TestServer(t *testing.T) { t.Fatal(err) } // We expect the same endpoint details post-handshake. + assertMetricsMatch(t, server) if diff := cmp.Diff(dupEndpoint, endpoint, cmpopts.EquateComparable(netip.AddrPort{}, key.DiscoPublic{})); diff != "" { t.Fatalf("wrong dupEndpoint (-got +want)\n%s", diff) } @@ -308,6 +313,7 @@ func TestServer(t *testing.T) { defer tcAOnNewPort.close() // Handshake client A on a new source IP:port, verify we can send packets on the new binding + assertMetricsMatch(t, server) tcAOnNewPort.handshake(t) fromAOnNewPort := []byte{7, 8, 9} @@ -330,6 +336,8 @@ func TestServer(t *testing.T) { if !bytes.Equal(fromBOnNewPort, rxFromB) { t.Fatal("unexpected msg B->A") } + + assertMetricsMatch(t, server) }) } }