From e75f13bd93bd154e4e3e6c62c69ccae68863f2b7 Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Wed, 15 Oct 2025 14:57:32 -0700 Subject: [PATCH] net/connstats: prepare to remove package (#17554) The connstats package was an unnecessary layer of indirection. It was seperated out of wgengine/netlog so that net/tstun and wgengine/magicsock wouldn't need a depenedency on the concrete implementation of network flow logging. Instead, we simply register a callback for counting connections. This PR does the bare minimum work to prepare tstun and magicsock to only care about that callback. A future PR will delete connstats and merge it into netlog. Updates tailscale/corp#33352 Signed-off-by: Joe Tsai --- cmd/k8s-operator/depaware.txt | 3 +- cmd/tailscaled/depaware-min.txt | 2 +- cmd/tailscaled/depaware-minbox.txt | 2 +- cmd/tailscaled/depaware.txt | 3 +- cmd/tsidp/depaware.txt | 3 +- net/connstats/stats.go | 38 ++++++++++---------- net/tstun/wrap.go | 34 +++++++++++------- net/tstun/wrap_test.go | 15 ++++---- tsnet/depaware.txt | 3 +- types/netlogfunc/netlogfunc.go | 15 ++++++++ types/netlogtype/netlogtype.go | 42 ++++++++++++++++++++++ wgengine/magicsock/derp.go | 4 +-- wgengine/magicsock/endpoint.go | 8 ++--- wgengine/magicsock/magicsock.go | 16 ++++----- wgengine/magicsock/magicsock_test.go | 52 +++++++++++++--------------- wgengine/netlog/netlog.go | 43 +++++++++-------------- 16 files changed, 170 insertions(+), 113 deletions(-) create mode 100644 types/netlogfunc/netlogfunc.go diff --git a/cmd/k8s-operator/depaware.txt b/cmd/k8s-operator/depaware.txt index d4fdb87fc..8a8397f28 100644 --- a/cmd/k8s-operator/depaware.txt +++ b/cmd/k8s-operator/depaware.txt @@ -768,7 +768,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/ tailscale.com/net/bakedroots from tailscale.com/net/tlsdial+ 💣 tailscale.com/net/batching from tailscale.com/wgengine/magicsock tailscale.com/net/captivedetection from tailscale.com/ipn/ipnlocal+ - tailscale.com/net/connstats from tailscale.com/net/tstun+ + tailscale.com/net/connstats from tailscale.com/wgengine/netlog tailscale.com/net/dns from tailscale.com/ipn/ipnlocal+ tailscale.com/net/dns/publicdns from tailscale.com/net/dns+ tailscale.com/net/dns/resolvconffile from tailscale.com/cmd/k8s-operator+ @@ -834,6 +834,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/ tailscale.com/types/logger from tailscale.com/appc+ tailscale.com/types/logid from tailscale.com/ipn/ipnlocal+ tailscale.com/types/mapx from tailscale.com/ipn/ipnext + tailscale.com/types/netlogfunc from tailscale.com/net/tstun+ tailscale.com/types/netlogtype from tailscale.com/net/connstats+ tailscale.com/types/netmap from tailscale.com/control/controlclient+ tailscale.com/types/nettype from tailscale.com/ipn/localapi+ diff --git a/cmd/tailscaled/depaware-min.txt b/cmd/tailscaled/depaware-min.txt index fe50dface..96e18db43 100644 --- a/cmd/tailscaled/depaware-min.txt +++ b/cmd/tailscaled/depaware-min.txt @@ -78,7 +78,6 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/logtail/filch from tailscale.com/log/sockstatlog+ tailscale.com/net/bakedroots from tailscale.com/net/tlsdial 💣 tailscale.com/net/batching from tailscale.com/wgengine/magicsock - tailscale.com/net/connstats from tailscale.com/net/tstun+ tailscale.com/net/dns from tailscale.com/cmd/tailscaled+ tailscale.com/net/dns/publicdns from tailscale.com/net/dns+ tailscale.com/net/dns/resolvconffile from tailscale.com/net/dns+ @@ -132,6 +131,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/logger from tailscale.com/appc+ tailscale.com/types/logid from tailscale.com/cmd/tailscaled+ tailscale.com/types/mapx from tailscale.com/ipn/ipnext + tailscale.com/types/netlogfunc from tailscale.com/net/tstun+ tailscale.com/types/netmap from tailscale.com/control/controlclient+ tailscale.com/types/nettype from tailscale.com/net/batching+ tailscale.com/types/opt from tailscale.com/control/controlknobs+ diff --git a/cmd/tailscaled/depaware-minbox.txt b/cmd/tailscaled/depaware-minbox.txt index a4999825e..d46180e2d 100644 --- a/cmd/tailscaled/depaware-minbox.txt +++ b/cmd/tailscaled/depaware-minbox.txt @@ -102,7 +102,6 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/net/ace from tailscale.com/cmd/tailscale/cli tailscale.com/net/bakedroots from tailscale.com/net/tlsdial 💣 tailscale.com/net/batching from tailscale.com/wgengine/magicsock - tailscale.com/net/connstats from tailscale.com/net/tstun+ tailscale.com/net/dns from tailscale.com/cmd/tailscaled+ tailscale.com/net/dns/publicdns from tailscale.com/net/dns+ tailscale.com/net/dns/resolvconffile from tailscale.com/net/dns+ @@ -158,6 +157,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/logger from tailscale.com/appc+ tailscale.com/types/logid from tailscale.com/cmd/tailscaled+ tailscale.com/types/mapx from tailscale.com/ipn/ipnext + tailscale.com/types/netlogfunc from tailscale.com/net/tstun+ tailscale.com/types/netmap from tailscale.com/control/controlclient+ tailscale.com/types/nettype from tailscale.com/net/batching+ tailscale.com/types/opt from tailscale.com/control/controlknobs+ diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 6ca10f80c..eed40845c 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -330,7 +330,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/net/bakedroots from tailscale.com/net/tlsdial+ 💣 tailscale.com/net/batching from tailscale.com/wgengine/magicsock+ tailscale.com/net/captivedetection from tailscale.com/ipn/ipnlocal+ - tailscale.com/net/connstats from tailscale.com/net/tstun+ + tailscale.com/net/connstats from tailscale.com/wgengine/netlog tailscale.com/net/dns from tailscale.com/cmd/tailscaled+ tailscale.com/net/dns/publicdns from tailscale.com/net/dns+ tailscale.com/net/dns/resolvconffile from tailscale.com/net/dns+ @@ -401,6 +401,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/logger from tailscale.com/appc+ tailscale.com/types/logid from tailscale.com/cmd/tailscaled+ tailscale.com/types/mapx from tailscale.com/ipn/ipnext + tailscale.com/types/netlogfunc from tailscale.com/net/tstun+ tailscale.com/types/netlogtype from tailscale.com/net/connstats+ tailscale.com/types/netmap from tailscale.com/control/controlclient+ tailscale.com/types/nettype from tailscale.com/ipn/localapi+ diff --git a/cmd/tsidp/depaware.txt b/cmd/tsidp/depaware.txt index 894b4a078..1b6bb6d63 100644 --- a/cmd/tsidp/depaware.txt +++ b/cmd/tsidp/depaware.txt @@ -174,7 +174,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar tailscale.com/net/bakedroots from tailscale.com/ipn/ipnlocal+ 💣 tailscale.com/net/batching from tailscale.com/wgengine/magicsock tailscale.com/net/captivedetection from tailscale.com/ipn/ipnlocal+ - tailscale.com/net/connstats from tailscale.com/net/tstun+ + tailscale.com/net/connstats from tailscale.com/wgengine/netlog tailscale.com/net/dns from tailscale.com/ipn/ipnlocal+ tailscale.com/net/dns/publicdns from tailscale.com/net/dns+ tailscale.com/net/dns/resolvconffile from tailscale.com/net/dns+ @@ -239,6 +239,7 @@ tailscale.com/cmd/tsidp dependencies: (generated by github.com/tailscale/depawar tailscale.com/types/logger from tailscale.com/appc+ tailscale.com/types/logid from tailscale.com/ipn/ipnlocal+ tailscale.com/types/mapx from tailscale.com/ipn/ipnext + tailscale.com/types/netlogfunc from tailscale.com/net/tstun+ tailscale.com/types/netlogtype from tailscale.com/net/connstats+ tailscale.com/types/netmap from tailscale.com/control/controlclient+ tailscale.com/types/nettype from tailscale.com/ipn/localapi+ diff --git a/net/connstats/stats.go b/net/connstats/stats.go index 44b276254..206181b27 100644 --- a/net/connstats/stats.go +++ b/net/connstats/stats.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sync/errgroup" "tailscale.com/net/packet" "tailscale.com/net/tsaddr" + "tailscale.com/types/ipproto" "tailscale.com/types/netlogtype" ) @@ -85,14 +86,18 @@ func NewStatistics(maxPeriod time.Duration, maxConns int, dump func(start, end t // The source and destination of the packet directly correspond with // the source and destination in netlogtype.Connection. func (s *Statistics) UpdateTxVirtual(b []byte) { - s.updateVirtual(b, false) + var p packet.Parsed + p.Decode(b) + s.UpdateVirtual(p.IPProto, p.Src, p.Dst, 1, len(b), false) } // UpdateRxVirtual updates the counters for a received IP packet. // The source and destination of the packet are inverted with respect to // the source and destination in netlogtype.Connection. func (s *Statistics) UpdateRxVirtual(b []byte) { - s.updateVirtual(b, true) + var p packet.Parsed + p.Decode(b) + s.UpdateVirtual(p.IPProto, p.Dst, p.Src, 1, len(b), true) } var ( @@ -100,23 +105,18 @@ var ( tailscaleServiceIPv6 = tsaddr.TailscaleServiceIPv6() ) -func (s *Statistics) updateVirtual(b []byte, receive bool) { - var p packet.Parsed - p.Decode(b) - conn := netlogtype.Connection{Proto: p.IPProto, Src: p.Src, Dst: p.Dst} - if receive { - conn.Src, conn.Dst = conn.Dst, conn.Src - } - +func (s *Statistics) UpdateVirtual(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, receive bool) { // Network logging is defined as traffic between two Tailscale nodes. // Traffic with the internal Tailscale service is not with another node // and should not be logged. It also happens to be a high volume // amount of discrete traffic flows (e.g., DNS lookups). - switch conn.Dst.Addr() { + switch dst.Addr() { case tailscaleServiceIPv4, tailscaleServiceIPv6: return } + conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst} + s.mu.Lock() defer s.mu.Unlock() cnts, found := s.virtual[conn] @@ -124,11 +124,11 @@ func (s *Statistics) updateVirtual(b []byte, receive bool) { return } if receive { - cnts.RxPackets++ - cnts.RxBytes += uint64(len(b)) + cnts.RxPackets += uint64(packets) + cnts.RxBytes += uint64(bytes) } else { - cnts.TxPackets++ - cnts.TxBytes += uint64(len(b)) + cnts.TxPackets += uint64(packets) + cnts.TxBytes += uint64(bytes) } s.virtual[conn] = cnts } @@ -138,7 +138,7 @@ func (s *Statistics) updateVirtual(b []byte, receive bool) { // The dst is a remote IP address and port that corresponds // with some physical peer backing the Tailscale IP address. func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) { - s.updatePhysical(src, dst, packets, bytes, false) + s.UpdatePhysical(0, netip.AddrPortFrom(src, 0), dst, packets, bytes, false) } // UpdateRxPhysical updates the counters for zero or more received wireguard packets. @@ -146,11 +146,11 @@ func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packet // The dst is a remote IP address and port that corresponds // with some physical peer backing the Tailscale IP address. func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) { - s.updatePhysical(src, dst, packets, bytes, true) + s.UpdatePhysical(0, netip.AddrPortFrom(src, 0), dst, packets, bytes, true) } -func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int, receive bool) { - conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst} +func (s *Statistics) UpdatePhysical(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, receive bool) { + conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst} s.mu.Lock() defer s.mu.Unlock() diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index fb93ca21e..dfbab7812 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -24,7 +24,6 @@ import ( "go4.org/mem" "tailscale.com/disco" "tailscale.com/feature/buildfeatures" - "tailscale.com/net/connstats" "tailscale.com/net/packet" "tailscale.com/net/packet/checksum" "tailscale.com/net/tsaddr" @@ -33,6 +32,7 @@ import ( "tailscale.com/types/ipproto" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/types/netlogfunc" "tailscale.com/util/clientmetric" "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" @@ -203,8 +203,8 @@ type Wrapper struct { // disableTSMPRejected disables TSMP rejected responses. For tests. disableTSMPRejected bool - // stats maintains per-connection counters. - stats atomic.Pointer[connstats.Statistics] + // connCounter maintains per-connection counters. + connCounter syncs.AtomicValue[netlogfunc.ConnectionCounter] captureHook syncs.AtomicValue[packet.CaptureCallback] @@ -977,8 +977,8 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) { } sizes[buffsPos] = n if buildfeatures.HasConnStats { - if stats := t.stats.Load(); stats != nil { - stats.UpdateTxVirtual(p.Buffer()) + if update := t.connCounter.Load(); update != nil { + updateConnCounter(update, p.Buffer(), false) } } buffsPos++ @@ -1106,9 +1106,9 @@ func (t *Wrapper) injectedRead(res tunInjectedRead, outBuffs [][]byte, sizes []i } if buildfeatures.HasConnStats { - if stats := t.stats.Load(); stats != nil { + if update := t.connCounter.Load(); update != nil { for i := 0; i < n; i++ { - stats.UpdateTxVirtual(outBuffs[i][offset : offset+sizes[i]]) + updateConnCounter(update, outBuffs[i][offset:offset+sizes[i]], false) } } } @@ -1276,9 +1276,9 @@ func (t *Wrapper) Write(buffs [][]byte, offset int) (int, error) { func (t *Wrapper) tdevWrite(buffs [][]byte, offset int) (int, error) { if buildfeatures.HasConnStats { - if stats := t.stats.Load(); stats != nil { + if update := t.connCounter.Load(); update != nil { for i := range buffs { - stats.UpdateRxVirtual((buffs)[i][offset:]) + updateConnCounter(update, buffs[i][offset:], true) } } } @@ -1498,11 +1498,11 @@ func (t *Wrapper) Unwrap() tun.Device { return t.tdev } -// SetStatistics specifies a per-connection statistics aggregator. +// SetConnectionCounter specifies a per-connection statistics aggregator. // Nil may be specified to disable statistics gathering. -func (t *Wrapper) SetStatistics(stats *connstats.Statistics) { +func (t *Wrapper) SetConnectionCounter(fn netlogfunc.ConnectionCounter) { if buildfeatures.HasConnStats { - t.stats.Store(stats) + t.connCounter.Store(fn) } } @@ -1524,3 +1524,13 @@ func (t *Wrapper) InstallCaptureHook(cb packet.CaptureCallback) { } t.captureHook.Store(cb) } + +func updateConnCounter(update netlogfunc.ConnectionCounter, b []byte, receive bool) { + var p packet.Parsed + p.Decode(b) + if receive { + update(p.IPProto, p.Dst, p.Src, 1, len(b), true) + } else { + update(p.IPProto, p.Src, p.Dst, 1, len(b), false) + } +} diff --git a/net/tstun/wrap_test.go b/net/tstun/wrap_test.go index 223ee34f4..a66888191 100644 --- a/net/tstun/wrap_test.go +++ b/net/tstun/wrap_test.go @@ -5,7 +5,6 @@ package tstun import ( "bytes" - "context" "encoding/binary" "encoding/hex" "expvar" @@ -27,7 +26,6 @@ import ( "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/tcpip/stack" "tailscale.com/disco" - "tailscale.com/net/connstats" "tailscale.com/net/netaddr" "tailscale.com/net/packet" "tailscale.com/tstest" @@ -370,9 +368,8 @@ func TestFilter(t *testing.T) { }() var buf [MaxPacketSize]byte - stats := connstats.NewStatistics(0, 0, nil) - defer stats.Shutdown(context.Background()) - tun.SetStatistics(stats) + var stats netlogtype.CountsByConnection + tun.SetConnectionCounter(stats.Add) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var n int @@ -380,9 +377,10 @@ func TestFilter(t *testing.T) { var filtered bool sizes := make([]int, 1) - tunStats, _ := stats.TestExtract() + tunStats := stats.Clone() + stats.Reset() if len(tunStats) > 0 { - t.Errorf("connstats.Statistics.Extract = %v, want {}", stats) + t.Errorf("connstats.Statistics.Extract = %v, want {}", tunStats) } if tt.dir == in { @@ -415,7 +413,8 @@ func TestFilter(t *testing.T) { } } - got, _ := stats.TestExtract() + got := stats.Clone() + stats.Reset() want := map[netlogtype.Connection]netlogtype.Counts{} var wasUDP bool if !tt.drop { diff --git a/tsnet/depaware.txt b/tsnet/depaware.txt index d602c7b2f..893e52f2c 100644 --- a/tsnet/depaware.txt +++ b/tsnet/depaware.txt @@ -170,7 +170,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware) tailscale.com/net/bakedroots from tailscale.com/ipn/ipnlocal+ 💣 tailscale.com/net/batching from tailscale.com/wgengine/magicsock tailscale.com/net/captivedetection from tailscale.com/ipn/ipnlocal+ - tailscale.com/net/connstats from tailscale.com/net/tstun+ + tailscale.com/net/connstats from tailscale.com/wgengine/netlog tailscale.com/net/dns from tailscale.com/ipn/ipnlocal+ tailscale.com/net/dns/publicdns from tailscale.com/net/dns+ tailscale.com/net/dns/resolvconffile from tailscale.com/net/dns+ @@ -234,6 +234,7 @@ tailscale.com/tsnet dependencies: (generated by github.com/tailscale/depaware) tailscale.com/types/logger from tailscale.com/appc+ tailscale.com/types/logid from tailscale.com/ipn/ipnlocal+ tailscale.com/types/mapx from tailscale.com/ipn/ipnext + tailscale.com/types/netlogfunc from tailscale.com/net/tstun+ tailscale.com/types/netlogtype from tailscale.com/net/connstats+ tailscale.com/types/netmap from tailscale.com/control/controlclient+ tailscale.com/types/nettype from tailscale.com/ipn/localapi+ diff --git a/types/netlogfunc/netlogfunc.go b/types/netlogfunc/netlogfunc.go new file mode 100644 index 000000000..6185fcb71 --- /dev/null +++ b/types/netlogfunc/netlogfunc.go @@ -0,0 +1,15 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package netlogfunc defines types for network logging. +package netlogfunc + +import ( + "net/netip" + + "tailscale.com/types/ipproto" +) + +// ConnectionCounter is a function for counting packets and bytes +// for a particular connection. +type ConnectionCounter func(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, recv bool) diff --git a/types/netlogtype/netlogtype.go b/types/netlogtype/netlogtype.go index 0f552611e..a29ea6f03 100644 --- a/types/netlogtype/netlogtype.go +++ b/types/netlogtype/netlogtype.go @@ -5,7 +5,9 @@ package netlogtype import ( + "maps" "net/netip" + "sync" "time" "tailscale.com/tailcfg" @@ -83,3 +85,43 @@ func (c1 Counts) Add(c2 Counts) Counts { c1.RxBytes += c2.RxBytes return c1 } + +// CountsByConnection is a count of packets and bytes for each connection. +// All methods are safe for concurrent calls. +type CountsByConnection struct { + mu sync.Mutex + m map[Connection]Counts +} + +// Add adds packets and bytes for the specified connection. +func (c *CountsByConnection) Add(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, recv bool) { + conn := Connection{Proto: proto, Src: src, Dst: dst} + c.mu.Lock() + defer c.mu.Unlock() + if c.m == nil { + c.m = make(map[Connection]Counts) + } + cnts := c.m[conn] + if recv { + cnts.RxPackets += uint64(packets) + cnts.RxBytes += uint64(bytes) + } else { + cnts.TxPackets += uint64(packets) + cnts.TxBytes += uint64(bytes) + } + c.m[conn] = cnts +} + +// Clone deep copies the map. +func (c *CountsByConnection) Clone() map[Connection]Counts { + c.mu.Lock() + defer c.mu.Unlock() + return maps.Clone(c.m) +} + +// Reset clear the map. +func (c *CountsByConnection) Reset() { + c.mu.Lock() + defer c.mu.Unlock() + clear(c.m) +} diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index d33745892..37a4f1a64 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -717,8 +717,8 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en } ep.noteRecvActivity(srcAddr, mono.Now()) - if stats := c.stats.Load(); stats != nil { - stats.UpdateRxPhysical(ep.nodeAddr, srcAddr.ap, 1, dm.n) + if update := c.connCounter.Load(); update != nil { + update(0, netip.AddrPortFrom(ep.nodeAddr, 0), srcAddr.ap, 1, dm.n, true) } c.metrics.inboundPacketsDERPTotal.Add(1) diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 7deafb752..2010775a1 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -1105,8 +1105,8 @@ func (de *endpoint) send(buffs [][]byte, offset int) error { } // TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends. - if stats := de.c.stats.Load(); err == nil && stats != nil { - stats.UpdateTxPhysical(de.nodeAddr, udpAddr.ap, len(buffs), txBytes) + if update := de.c.connCounter.Load(); err == nil && update != nil { + update(0, netip.AddrPortFrom(de.nodeAddr, 0), udpAddr.ap, len(buffs), txBytes, false) } } if derpAddr.IsValid() { @@ -1123,8 +1123,8 @@ func (de *endpoint) send(buffs [][]byte, offset int) error { } } - if stats := de.c.stats.Load(); stats != nil { - stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buffs), txBytes) + if update := de.c.connCounter.Load(); update != nil { + update(0, netip.AddrPortFrom(de.nodeAddr, 0), derpAddr, len(buffs), txBytes, false) } if allOk { return nil diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index f855936ce..61fc50d12 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -37,7 +37,6 @@ import ( "tailscale.com/hostinfo" "tailscale.com/ipn/ipnstate" "tailscale.com/net/batching" - "tailscale.com/net/connstats" "tailscale.com/net/netcheck" "tailscale.com/net/neterror" "tailscale.com/net/netmon" @@ -56,6 +55,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/lazy" "tailscale.com/types/logger" + "tailscale.com/types/netlogfunc" "tailscale.com/types/netmap" "tailscale.com/types/nettype" "tailscale.com/types/views" @@ -261,8 +261,8 @@ type Conn struct { //lint:ignore U1000 used on Linux/Darwin only peerMTUEnabled atomic.Bool - // stats maintains per-connection counters. - stats atomic.Pointer[connstats.Statistics] + // connCounter maintains per-connection counters. + connCounter syncs.AtomicValue[netlogfunc.ConnectionCounter] // captureHook, if non-nil, is the pcap logging callback when capturing. captureHook syncs.AtomicValue[packet.CaptureCallback] @@ -1862,8 +1862,8 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *epAddrEndpointCach ep.lastRecvUDPAny.StoreAtomic(now) connNoted := ep.noteRecvActivity(src, now) if buildfeatures.HasConnStats { - if stats := c.stats.Load(); stats != nil { - stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, geneveInclusivePacketLen) + if update := c.connCounter.Load(); update != nil { + update(0, netip.AddrPortFrom(ep.nodeAddr, 0), ipp, 1, geneveInclusivePacketLen, true) } } if src.vni.IsSet() && (connNoted || looksLikeInitiationMsg(b)) { @@ -3745,11 +3745,11 @@ func (c *Conn) UpdateStatus(sb *ipnstate.StatusBuilder) { }) } -// SetStatistics specifies a per-connection statistics aggregator. +// SetConnectionCounter specifies a per-connection statistics aggregator. // Nil may be specified to disable statistics gathering. -func (c *Conn) SetStatistics(stats *connstats.Statistics) { +func (c *Conn) SetConnectionCounter(fn netlogfunc.ConnectionCounter) { if buildfeatures.HasConnStats { - c.stats.Store(stats) + c.connCounter.Store(fn) } } diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index d1d62a26e..60620b141 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -32,6 +32,7 @@ import ( "unsafe" qt "github.com/frankban/quicktest" + "github.com/google/go-cmp/cmp" wgconn "github.com/tailscale/wireguard-go/conn" "github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/tun/tuntest" @@ -45,7 +46,6 @@ import ( "tailscale.com/envknob" "tailscale.com/health" "tailscale.com/ipn/ipnstate" - "tailscale.com/net/connstats" "tailscale.com/net/netaddr" "tailscale.com/net/netcheck" "tailscale.com/net/netmon" @@ -158,14 +158,14 @@ func runDERPAndStun(t *testing.T, logf logger.Logf, l nettype.PacketListener, st // happiness. type magicStack struct { privateKey key.NodePrivate - epCh chan []tailcfg.Endpoint // endpoint updates produced by this peer - stats *connstats.Statistics // per-connection statistics - conn *Conn // the magicsock itself - tun *tuntest.ChannelTUN // TUN device to send/receive packets - tsTun *tstun.Wrapper // wrapped tun that implements filtering and wgengine hooks - dev *device.Device // the wireguard-go Device that connects the previous things - wgLogger *wglog.Logger // wireguard-go log wrapper - netMon *netmon.Monitor // always non-nil + epCh chan []tailcfg.Endpoint // endpoint updates produced by this peer + counts netlogtype.CountsByConnection // per-connection statistics + conn *Conn // the magicsock itself + tun *tuntest.ChannelTUN // TUN device to send/receive packets + tsTun *tstun.Wrapper // wrapped tun that implements filtering and wgengine hooks + dev *device.Device // the wireguard-go Device that connects the previous things + wgLogger *wglog.Logger // wireguard-go log wrapper + netMon *netmon.Monitor // always non-nil metrics *usermetric.Registry } @@ -1143,22 +1143,19 @@ func testTwoDevicePing(t *testing.T, d *devices) { } } - m1.stats = connstats.NewStatistics(0, 0, nil) - defer m1.stats.Shutdown(context.Background()) - m1.conn.SetStatistics(m1.stats) - m2.stats = connstats.NewStatistics(0, 0, nil) - defer m2.stats.Shutdown(context.Background()) - m2.conn.SetStatistics(m2.stats) + m1.conn.SetConnectionCounter(m1.counts.Add) + m2.conn.SetConnectionCounter(m2.counts.Add) checkStats := func(t *testing.T, m *magicStack, wantConns []netlogtype.Connection) { - _, stats := m.stats.TestExtract() + defer m.counts.Reset() + counts := m.counts.Clone() for _, conn := range wantConns { - if _, ok := stats[conn]; ok { + if _, ok := counts[conn]; ok { return } } t.Helper() - t.Errorf("missing any connection to %s from %s", wantConns, slicesx.MapKeys(stats)) + t.Errorf("missing any connection to %s from %s", wantConns, slicesx.MapKeys(counts)) } addrPort := netip.MustParseAddrPort @@ -1221,9 +1218,9 @@ func testTwoDevicePing(t *testing.T, d *devices) { setT(t) defer setT(outerT) m1.conn.resetMetricsForTest() - m1.stats.TestExtract() + m1.counts.Reset() m2.conn.resetMetricsForTest() - m2.stats.TestExtract() + m2.counts.Reset() t.Logf("Metrics before: %s\n", m1.metrics.String()) ping1(t) ping2(t) @@ -1249,8 +1246,6 @@ func (c *Conn) resetMetricsForTest() { } func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { - _, phys := ms.stats.TestExtract() - physIPv4RxBytes := int64(0) physIPv4TxBytes := int64(0) physDERPRxBytes := int64(0) @@ -1259,7 +1254,7 @@ func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { physIPv4TxPackets := int64(0) physDERPRxPackets := int64(0) physDERPTxPackets := int64(0) - for conn, count := range phys { + for conn, count := range ms.counts.Clone() { t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String()) if conn.Dst.String() == "127.3.3.40:1" { physDERPRxBytes += int64(count.RxBytes) @@ -1273,6 +1268,7 @@ func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { physIPv4TxPackets += int64(count.TxPackets) } } + ms.counts.Reset() metricIPv4RxBytes := ms.conn.metrics.inboundBytesIPv4Total.Value() metricIPv4RxPackets := ms.conn.metrics.inboundPacketsIPv4Total.Value() @@ -3986,7 +3982,8 @@ func TestConn_receiveIP(t *testing.T) { c.noteRecvActivity = func(public key.NodePublic) { noteRecvActivityCalled = true } - c.SetStatistics(connstats.NewStatistics(0, 0, nil)) + var counts netlogtype.CountsByConnection + c.SetConnectionCounter(counts.Add) if tt.insertWantEndpointTypeInPeerMap { var insertEPIntoPeerMap *endpoint @@ -4059,9 +4056,8 @@ func TestConn_receiveIP(t *testing.T) { } // Verify physical rx stats - stats := c.stats.Load() - _, gotPhy := stats.TestExtract() wantNonzeroRxStats := false + gotPhy := counts.Clone() switch ep := tt.wantEndpointType.(type) { case *lazyEndpoint: if ep.maybeEP != nil { @@ -4081,8 +4077,8 @@ func TestConn_receiveIP(t *testing.T) { RxBytes: wantRxBytes, }, } - if !reflect.DeepEqual(gotPhy, wantPhy) { - t.Errorf("receiveIP() got physical conn stats = %v, want %v", gotPhy, wantPhy) + if d := cmp.Diff(gotPhy, wantPhy); d != "" { + t.Errorf("receiveIP() stats mismatch (-got +want):\n%s", d) } } else { if len(gotPhy) != 0 { diff --git a/wgengine/netlog/netlog.go b/wgengine/netlog/netlog.go index 7e1938d27..a04fd2126 100644 --- a/wgengine/netlog/netlog.go +++ b/wgengine/netlog/netlog.go @@ -8,6 +8,7 @@ package netlog import ( + "cmp" "context" "encoding/json" "errors" @@ -19,7 +20,6 @@ import ( "sync" "time" - "tailscale.com/feature/buildfeatures" "tailscale.com/health" "tailscale.com/logpolicy" "tailscale.com/logtail" @@ -29,6 +29,7 @@ import ( "tailscale.com/net/tsaddr" "tailscale.com/tailcfg" "tailscale.com/types/logid" + "tailscale.com/types/netlogfunc" "tailscale.com/types/netlogtype" "tailscale.com/util/eventbus" "tailscale.com/wgengine/router" @@ -40,12 +41,12 @@ const pollPeriod = 5 * time.Second // Device is an abstraction over a tunnel device or a magic socket. // Both *tstun.Wrapper and *magicsock.Conn implement this interface. type Device interface { - SetStatistics(*connstats.Statistics) + SetConnectionCounter(netlogfunc.ConnectionCounter) } type noopDevice struct{} -func (noopDevice) SetStatistics(*connstats.Statistics) {} +func (noopDevice) SetConnectionCounter(netlogfunc.ConnectionCounter) {} // Logger logs statistics about every connection. // At present, it only logs connections within a tailscale network. @@ -131,31 +132,21 @@ func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID lo // can upload to the Tailscale log service, so stay below this limit. const maxLogSize = 256 << 10 const maxConns = (maxLogSize - netlogtype.MaxMessageJSONSize) / netlogtype.MaxConnectionCountsJSONSize - if buildfeatures.HasConnStats { - nl.stats = connstats.NewStatistics(pollPeriod, maxConns, func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts) { - nl.mu.Lock() - addrs := nl.addrs - prefixes := nl.prefixes - nl.mu.Unlock() - recordStatistics(nl.logger, nodeID, start, end, virtual, physical, addrs, prefixes, logExitFlowEnabledEnabled) - }) - } + nl.stats = connstats.NewStatistics(pollPeriod, maxConns, func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts) { + nl.mu.Lock() + addrs := nl.addrs + prefixes := nl.prefixes + nl.mu.Unlock() + recordStatistics(nl.logger, nodeID, start, end, virtual, physical, addrs, prefixes, logExitFlowEnabledEnabled) + }) // Register the connection tracker into the TUN device. - if tun == nil { - tun = noopDevice{} - } - nl.tun = tun - if buildfeatures.HasConnStats { - nl.tun.SetStatistics(nl.stats) - } + nl.tun = cmp.Or[Device](tun, noopDevice{}) + nl.tun.SetConnectionCounter(nl.stats.UpdateVirtual) // Register the connection tracker into magicsock. - if sock == nil { - sock = noopDevice{} - } - nl.sock = sock - nl.sock.SetStatistics(nl.stats) + nl.sock = cmp.Or[Device](sock, noopDevice{}) + nl.sock.SetConnectionCounter(nl.stats.UpdatePhysical) return nil } @@ -265,8 +256,8 @@ func (nl *Logger) Shutdown(ctx context.Context) error { // Shutdown in reverse order of Startup. // Do not hold lock while shutting down since this may flush one last time. nl.mu.Unlock() - nl.sock.SetStatistics(nil) - nl.tun.SetStatistics(nil) + nl.sock.SetConnectionCounter(nil) + nl.tun.SetConnectionCounter(nil) err1 := nl.stats.Shutdown(ctx) err2 := nl.logger.Shutdown(ctx) nl.mu.Lock()