mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-31 08:11:32 +01:00 
			
		
		
		
	Saves only 12 KB, but notably removes some deps on packages that future changes can then eliminate entirely. Updates #12614 Change-Id: Ibf830d3ee08f621d0a2011b1d4cd175427ef50df Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
		
			
				
	
	
		
			225 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			225 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) Tailscale Inc & AUTHORS
 | |
| // SPDX-License-Identifier: BSD-3-Clause
 | |
| 
 | |
| //go:build !ts_omit_connstats
 | |
| 
 | |
| // Package connstats maintains statistics about connections
 | |
| // flowing through a TUN device (which operate at the IP layer).
 | |
| package connstats
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"net/netip"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"golang.org/x/sync/errgroup"
 | |
| 	"tailscale.com/net/packet"
 | |
| 	"tailscale.com/net/tsaddr"
 | |
| 	"tailscale.com/types/netlogtype"
 | |
| )
 | |
| 
 | |
| // Statistics maintains counters for every connection.
 | |
| // All methods are safe for concurrent use.
 | |
| // The zero value is ready for use.
 | |
| type Statistics struct {
 | |
| 	maxConns int // immutable once set
 | |
| 
 | |
| 	mu sync.Mutex
 | |
| 	connCnts
 | |
| 
 | |
| 	connCntsCh  chan connCnts
 | |
| 	shutdownCtx context.Context
 | |
| 	shutdown    context.CancelFunc
 | |
| 	group       errgroup.Group
 | |
| }
 | |
| 
 | |
| type connCnts struct {
 | |
| 	start    time.Time
 | |
| 	end      time.Time
 | |
| 	virtual  map[netlogtype.Connection]netlogtype.Counts
 | |
| 	physical map[netlogtype.Connection]netlogtype.Counts
 | |
| }
 | |
| 
 | |
| // NewStatistics creates a data structure for tracking connection statistics
 | |
| // that periodically dumps the virtual and physical connection counts
 | |
| // depending on whether the maxPeriod or maxConns is exceeded.
 | |
| // The dump function is called from a single goroutine.
 | |
| // Shutdown must be called to cleanup resources.
 | |
| func NewStatistics(maxPeriod time.Duration, maxConns int, dump func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts)) *Statistics {
 | |
| 	s := &Statistics{maxConns: maxConns}
 | |
| 	s.connCntsCh = make(chan connCnts, 256)
 | |
| 	s.shutdownCtx, s.shutdown = context.WithCancel(context.Background())
 | |
| 	s.group.Go(func() error {
 | |
| 		// TODO(joetsai): Using a ticker is problematic on mobile platforms
 | |
| 		// where waking up a process every maxPeriod when there is no activity
 | |
| 		// is a drain on battery life. Switch this instead to instead use
 | |
| 		// a time.Timer that is triggered upon network activity.
 | |
| 		ticker := new(time.Ticker)
 | |
| 		if maxPeriod > 0 {
 | |
| 			ticker = time.NewTicker(maxPeriod)
 | |
| 			defer ticker.Stop()
 | |
| 		}
 | |
| 
 | |
| 		for {
 | |
| 			var cc connCnts
 | |
| 			select {
 | |
| 			case cc = <-s.connCntsCh:
 | |
| 			case <-ticker.C:
 | |
| 				cc = s.extract()
 | |
| 			case <-s.shutdownCtx.Done():
 | |
| 				cc = s.extract()
 | |
| 			}
 | |
| 			if len(cc.virtual)+len(cc.physical) > 0 && dump != nil {
 | |
| 				dump(cc.start, cc.end, cc.virtual, cc.physical)
 | |
| 			}
 | |
| 			if s.shutdownCtx.Err() != nil {
 | |
| 				return nil
 | |
| 			}
 | |
| 		}
 | |
| 	})
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // UpdateTxVirtual updates the counters for a transmitted IP packet
 | |
| // The source and destination of the packet directly correspond with
 | |
| // the source and destination in netlogtype.Connection.
 | |
| func (s *Statistics) UpdateTxVirtual(b []byte) {
 | |
| 	s.updateVirtual(b, false)
 | |
| }
 | |
| 
 | |
| // UpdateRxVirtual updates the counters for a received IP packet.
 | |
| // The source and destination of the packet are inverted with respect to
 | |
| // the source and destination in netlogtype.Connection.
 | |
| func (s *Statistics) UpdateRxVirtual(b []byte) {
 | |
| 	s.updateVirtual(b, true)
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	tailscaleServiceIPv4 = tsaddr.TailscaleServiceIP()
 | |
| 	tailscaleServiceIPv6 = tsaddr.TailscaleServiceIPv6()
 | |
| )
 | |
| 
 | |
| func (s *Statistics) updateVirtual(b []byte, receive bool) {
 | |
| 	var p packet.Parsed
 | |
| 	p.Decode(b)
 | |
| 	conn := netlogtype.Connection{Proto: p.IPProto, Src: p.Src, Dst: p.Dst}
 | |
| 	if receive {
 | |
| 		conn.Src, conn.Dst = conn.Dst, conn.Src
 | |
| 	}
 | |
| 
 | |
| 	// Network logging is defined as traffic between two Tailscale nodes.
 | |
| 	// Traffic with the internal Tailscale service is not with another node
 | |
| 	// and should not be logged. It also happens to be a high volume
 | |
| 	// amount of discrete traffic flows (e.g., DNS lookups).
 | |
| 	switch conn.Dst.Addr() {
 | |
| 	case tailscaleServiceIPv4, tailscaleServiceIPv6:
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	cnts, found := s.virtual[conn]
 | |
| 	if !found && !s.preInsertConn() {
 | |
| 		return
 | |
| 	}
 | |
| 	if receive {
 | |
| 		cnts.RxPackets++
 | |
| 		cnts.RxBytes += uint64(len(b))
 | |
| 	} else {
 | |
| 		cnts.TxPackets++
 | |
| 		cnts.TxBytes += uint64(len(b))
 | |
| 	}
 | |
| 	s.virtual[conn] = cnts
 | |
| }
 | |
| 
 | |
| // UpdateTxPhysical updates the counters for zero or more transmitted wireguard packets.
 | |
| // The src is always a Tailscale IP address, representing some remote peer.
 | |
| // The dst is a remote IP address and port that corresponds
 | |
| // with some physical peer backing the Tailscale IP address.
 | |
| func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) {
 | |
| 	s.updatePhysical(src, dst, packets, bytes, false)
 | |
| }
 | |
| 
 | |
| // UpdateRxPhysical updates the counters for zero or more received wireguard packets.
 | |
| // The src is always a Tailscale IP address, representing some remote peer.
 | |
| // The dst is a remote IP address and port that corresponds
 | |
| // with some physical peer backing the Tailscale IP address.
 | |
| func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) {
 | |
| 	s.updatePhysical(src, dst, packets, bytes, true)
 | |
| }
 | |
| 
 | |
| func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int, receive bool) {
 | |
| 	conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst}
 | |
| 
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	cnts, found := s.physical[conn]
 | |
| 	if !found && !s.preInsertConn() {
 | |
| 		return
 | |
| 	}
 | |
| 	if receive {
 | |
| 		cnts.RxPackets += uint64(packets)
 | |
| 		cnts.RxBytes += uint64(bytes)
 | |
| 	} else {
 | |
| 		cnts.TxPackets += uint64(packets)
 | |
| 		cnts.TxBytes += uint64(bytes)
 | |
| 	}
 | |
| 	s.physical[conn] = cnts
 | |
| }
 | |
| 
 | |
| // preInsertConn updates the maps to handle insertion of a new connection.
 | |
| // It reports false if insertion is not allowed (i.e., after shutdown).
 | |
| func (s *Statistics) preInsertConn() bool {
 | |
| 	// Check whether insertion of a new connection will exceed maxConns.
 | |
| 	if len(s.virtual)+len(s.physical) == s.maxConns && s.maxConns > 0 {
 | |
| 		// Extract the current statistics and send it to the serializer.
 | |
| 		// Avoid blocking the network packet handling path.
 | |
| 		select {
 | |
| 		case s.connCntsCh <- s.extractLocked():
 | |
| 		default:
 | |
| 			// TODO(joetsai): Log that we are dropping an entire connCounts.
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Initialize the maps if nil.
 | |
| 	if s.virtual == nil && s.physical == nil {
 | |
| 		s.start = time.Now().UTC()
 | |
| 		s.virtual = make(map[netlogtype.Connection]netlogtype.Counts)
 | |
| 		s.physical = make(map[netlogtype.Connection]netlogtype.Counts)
 | |
| 	}
 | |
| 
 | |
| 	return s.shutdownCtx.Err() == nil
 | |
| }
 | |
| 
 | |
| func (s *Statistics) extract() connCnts {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	return s.extractLocked()
 | |
| }
 | |
| 
 | |
| func (s *Statistics) extractLocked() connCnts {
 | |
| 	if len(s.virtual)+len(s.physical) == 0 {
 | |
| 		return connCnts{}
 | |
| 	}
 | |
| 	s.end = time.Now().UTC()
 | |
| 	cc := s.connCnts
 | |
| 	s.connCnts = connCnts{}
 | |
| 	return cc
 | |
| }
 | |
| 
 | |
| // TestExtract synchronously extracts the current network statistics map
 | |
| // and resets the counters. This should only be used for testing purposes.
 | |
| func (s *Statistics) TestExtract() (virtual, physical map[netlogtype.Connection]netlogtype.Counts) {
 | |
| 	cc := s.extract()
 | |
| 	return cc.virtual, cc.physical
 | |
| }
 | |
| 
 | |
| // Shutdown performs a final flush of statistics.
 | |
| // Statistics for any subsequent calls to Update will be dropped.
 | |
| // It is safe to call Shutdown concurrently and repeatedly.
 | |
| func (s *Statistics) Shutdown(context.Context) error {
 | |
| 	s.shutdown()
 | |
| 	return s.group.Wait()
 | |
| }
 |