diff --git a/cmd/tailscale/depaware-minlinux.txt b/cmd/tailscale/depaware-minlinux.txt index b30948028..18e1a1fe7 100644 --- a/cmd/tailscale/depaware-minlinux.txt +++ b/cmd/tailscale/depaware-minlinux.txt @@ -52,7 +52,6 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep tailscale.com/ipn from tailscale.com/client/tailscale+ tailscale.com/ipn/ipnstate from tailscale.com/client/tailscale+ tailscale.com/licenses from tailscale.com/client/web+ - tailscale.com/metrics from tailscale.com/health+ tailscale.com/net/flowtrack from tailscale.com/net/packet tailscale.com/net/netaddr from tailscale.com/ipn+ tailscale.com/net/netcheck from tailscale.com/cmd/tailscale/cli @@ -94,7 +93,6 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep tailscale.com/util/clientmetric from tailscale.com/net/netcheck+ tailscale.com/util/cmpver from tailscale.com/clientupdate tailscale.com/util/ctxkey from tailscale.com/types/logger - L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics tailscale.com/util/dnsname from tailscale.com/cmd/tailscale/cli+ tailscale.com/util/groupmember from tailscale.com/client/web tailscale.com/util/httpm from tailscale.com/client/tailscale+ @@ -110,7 +108,6 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep tailscale.com/util/syspolicy/setting from tailscale.com/client/tailscale+ tailscale.com/util/testenv from tailscale.com/cmd/tailscale/cli tailscale.com/util/truncate from tailscale.com/cmd/tailscale/cli - tailscale.com/util/usermetric from tailscale.com/health tailscale.com/util/vizerror from tailscale.com/tailcfg+ tailscale.com/version from tailscale.com/client/web+ tailscale.com/version/distro from tailscale.com/client/web+ @@ -186,11 +183,10 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep encoding/binary from compress/gzip+ encoding/gob from github.com/gorilla/securecookie encoding/hex from crypto/x509+ - encoding/json from expvar+ + encoding/json from github.com/google/uuid+ encoding/pem from crypto/tls+ encoding/xml from github.com/tailscale/goupnp+ errors from archive/tar+ - expvar from tailscale.com/health+ flag from github.com/peterbourgon/ff/v3+ fmt from archive/tar+ hash from compress/zlib+ @@ -206,7 +202,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep io/fs from archive/tar+ io/ioutil from github.com/mitchellh/go-ps+ iter from maps+ - log from expvar+ + log from github.com/skip2/go-qrcode+ log/internal from log maps from net/http+ math from archive/tar+ @@ -218,7 +214,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep mime/multipart from net/http mime/quotedprintable from mime/multipart net from crypto/tls+ - net/http from expvar+ + net/http from github.com/gorilla/csrf+ net/http/cgi from tailscale.com/cmd/tailscale/cli net/http/httptrace from golang.org/x/net/http2+ net/http/httputil from tailscale.com/client/web+ diff --git a/cmd/tailscaled/depaware-minlinux.txt b/cmd/tailscaled/depaware-minlinux.txt index 912355aa0..9f61c741d 100644 --- a/cmd/tailscaled/depaware-minlinux.txt +++ b/cmd/tailscaled/depaware-minlinux.txt @@ -30,7 +30,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/control/controlknobs from tailscale.com/control/controlclient+ tailscale.com/derp from tailscale.com/derp/derphttp+ tailscale.com/derp/derphttp from tailscale.com/wgengine/magicsock - tailscale.com/disco from tailscale.com/derp+ + tailscale.com/disco from tailscale.com/net/tstun+ tailscale.com/envknob from tailscale.com/cmd/tailscaled+ tailscale.com/health from tailscale.com/control/controlclient+ tailscale.com/health/healthmsg from tailscale.com/ipn/ipnlocal @@ -46,7 +46,6 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/ipn/store from tailscale.com/cmd/tailscaled tailscale.com/ipn/store/mem from tailscale.com/ipn/store tailscale.com/logtail/backoff from tailscale.com/control/controlclient+ - tailscale.com/metrics from tailscale.com/derp+ tailscale.com/net/flowtrack from tailscale.com/net/packet+ tailscale.com/net/ipset from tailscale.com/ipn/ipnlocal+ tailscale.com/net/netaddr from tailscale.com/ipn+ @@ -72,7 +71,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/tsd from tailscale.com/cmd/tailscaled+ tailscale.com/tstime from tailscale.com/control/controlclient+ tailscale.com/tstime/mono from tailscale.com/net/tstun+ - tailscale.com/tstime/rate from tailscale.com/derp+ + tailscale.com/tstime/rate from tailscale.com/wgengine/filter tailscale.com/types/empty from tailscale.com/ipn+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/ipn+ @@ -90,9 +89,8 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/types/structs from tailscale.com/control/controlclient+ tailscale.com/types/views from tailscale.com/control/controlclient+ tailscale.com/util/clientmetric from tailscale.com/control/controlclient+ - tailscale.com/util/ctxkey from tailscale.com/derp+ + tailscale.com/util/ctxkey from tailscale.com/ipn/ipnserver+ 💣 tailscale.com/util/deephash from tailscale.com/ipn/ipnlocal+ - L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics tailscale.com/util/dnsname from tailscale.com/hostinfo+ tailscale.com/util/execqueue from tailscale.com/control/controlclient tailscale.com/util/goroutines from tailscale.com/ipn/ipnlocal @@ -115,7 +113,6 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/util/sysresources from tailscale.com/wgengine/magicsock tailscale.com/util/testenv from tailscale.com/control/controlclient+ tailscale.com/util/uniq from tailscale.com/ipn/ipnlocal+ - tailscale.com/util/usermetric from tailscale.com/health+ tailscale.com/util/vizerror from tailscale.com/tailcfg+ tailscale.com/util/winutil from tailscale.com/ipn/ipnauth tailscale.com/util/zstdframe from tailscale.com/control/controlclient @@ -155,7 +152,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de golang.org/x/net/ipv6 from github.com/tailscale/wireguard-go/conn+ golang.org/x/net/proxy from tailscale.com/net/netns D golang.org/x/net/route from net+ - golang.org/x/sync/errgroup from github.com/mdlayher/socket+ + L golang.org/x/sync/errgroup from github.com/mdlayher/socket golang.org/x/sys/cpu from github.com/tailscale/wireguard-go/tun+ golang.org/x/sys/unix from github.com/mdlayher/socket+ golang.org/x/text/secure/bidirule from golang.org/x/net/idna @@ -190,17 +187,16 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de crypto/subtle from crypto/aes+ crypto/tls from golang.org/x/net/http2+ crypto/x509 from crypto/tls+ - crypto/x509/pkix from crypto/x509+ + crypto/x509/pkix from crypto/x509 embed from crypto/internal/nistec+ encoding from encoding/json+ encoding/asn1 from crypto/x509+ encoding/base64 from encoding/json+ encoding/binary from compress/gzip+ encoding/hex from crypto/x509+ - encoding/json from expvar+ + encoding/json from github.com/bits-and-blooms/bitset+ encoding/pem from crypto/tls+ errors from bufio+ - expvar from tailscale.com/derp+ flag from net/http/httptest+ fmt from compress/flate+ hash from crypto+ @@ -210,7 +206,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de io from bufio+ io/fs from crypto/x509+ iter from maps+ - log from expvar+ + log from github.com/klauspost/compress/zstd+ log/internal from log maps from net/http+ math from compress/flate+ @@ -222,7 +218,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de mime/multipart from net/http mime/quotedprintable from mime/multipart net from crypto/tls+ - net/http from expvar+ + net/http from golang.org/x/net/http2+ net/http/httptest from tailscale.com/control/controlclient net/http/httptrace from golang.org/x/net/http2+ net/http/httputil from tailscale.com/cmd/tailscaled diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index 37db34ae8..cd575039b 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -556,14 +556,11 @@ func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack boo ListenPort: args.port, NetMon: sys.NetMon.Get(), HealthTracker: sys.HealthTracker(), - Metrics: sys.UserMetricsRegistry(), Dialer: sys.Dialer.Get(), SetSubsystem: sys.Set, ControlKnobs: sys.ControlKnobs(), } - sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry()) - onlyNetstack = name == "userspace-networking" netstackSubnetRouter := onlyNetstack // but mutated later on some platforms netns.SetEnabled(!onlyNetstack) diff --git a/derp/derp_client.go b/derp/derp_client.go index 7a646fa51..235063876 100644 --- a/derp/derp_client.go +++ b/derp/derp_client.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "net" "net/netip" "sync" "time" @@ -22,6 +23,17 @@ import ( "tailscale.com/types/logger" ) +// Conn is the subset of the underlying net.Conn the DERP Server needs. +// It is a defined type so that non-net connections can be used. +type Conn interface { + io.WriteCloser + LocalAddr() net.Addr + // The *Deadline methods follow the semantics of net.Conn. + SetDeadline(time.Time) error + SetReadDeadline(time.Time) error + SetWriteDeadline(time.Time) error +} + // Client is a DERP client. type Client struct { serverKey key.NodePublic // of the DERP server; not a machine or node key @@ -140,6 +152,13 @@ func (c *Client) recvServerKey() error { return nil } +type serverInfo struct { + Version int `json:"version,omitempty"` + + TokenBucketBytesPerSecond int `json:",omitempty"` + TokenBucketBytesBurst int `json:",omitempty"` +} + func (c *Client) parseServerInfo(b []byte) (*serverInfo, error) { const maxLength = nonceLen + maxInfoLen fl := len(b) diff --git a/derp/derp_server.go b/derp/derp_server.go deleted file mode 100644 index c9d0e4e09..000000000 --- a/derp/derp_server.go +++ /dev/null @@ -1,2282 +0,0 @@ -// Copyright (c) Tailscale Inc & AUTHORS -// SPDX-License-Identifier: BSD-3-Clause - -package derp - -// TODO(crawshaw): with predefined serverKey in clients and HMAC on packets we could skip TLS - -import ( - "bufio" - "bytes" - "context" - "crypto/ed25519" - crand "crypto/rand" - "crypto/x509" - "crypto/x509/pkix" - "encoding/binary" - "encoding/json" - "errors" - "expvar" - "fmt" - "io" - "log" - "math" - "math/big" - "math/rand/v2" - "net" - "net/http" - "net/netip" - "os" - "os/exec" - "runtime" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "go4.org/mem" - "golang.org/x/sync/errgroup" - "tailscale.com/disco" - "tailscale.com/envknob" - "tailscale.com/metrics" - "tailscale.com/syncs" - "tailscale.com/tailcfg" - "tailscale.com/tstime" - "tailscale.com/tstime/rate" - "tailscale.com/types/key" - "tailscale.com/types/logger" - "tailscale.com/util/ctxkey" - "tailscale.com/util/mak" - "tailscale.com/util/set" - "tailscale.com/util/slicesx" - "tailscale.com/version" -) - -// verboseDropKeys is the set of destination public keys that should -// verbosely log whenever DERP drops a packet. -var verboseDropKeys = map[key.NodePublic]bool{} - -// IdealNodeHeader is the HTTP request header sent on DERP HTTP client requests -// to indicate that they're connecting to their ideal (Region.Nodes[0]) node. -// The HTTP header value is the name of the node they wish they were connected -// to. This is an optional header. -const IdealNodeHeader = "Ideal-Node" - -// IdealNodeContextKey is the context key used to pass the IdealNodeHeader value -// from the HTTP handler to the DERP server's Accept method. -var IdealNodeContextKey = ctxkey.New[string]("ideal-node", "") - -func init() { - keys := envknob.String("TS_DEBUG_VERBOSE_DROPS") - if keys == "" { - return - } - for _, keyStr := range strings.Split(keys, ",") { - k, err := key.ParseNodePublicUntyped(mem.S(keyStr)) - if err != nil { - log.Printf("ignoring invalid debug key %q: %v", keyStr, err) - } else { - verboseDropKeys[k] = true - } - } -} - -const ( - defaultPerClientSendQueueDepth = 32 // default packets buffered for sending - writeTimeout = 2 * time.Second - privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key -) - -func getPerClientSendQueueDepth() int { - if v, ok := envknob.LookupInt("TS_DEBUG_DERP_PER_CLIENT_SEND_QUEUE_DEPTH"); ok { - return v - } - - return defaultPerClientSendQueueDepth -} - -// dupPolicy is a temporary (2021-08-30) mechanism to change the policy -// of how duplicate connection for the same key are handled. -type dupPolicy int8 - -const ( - // lastWriterIsActive is a dupPolicy where the connection - // to send traffic for a peer is the active one. - lastWriterIsActive dupPolicy = iota - - // disableFighters is a dupPolicy that detects if peers - // are trying to send interleaved with each other and - // then disables all of them. - disableFighters -) - -type align64 [0]atomic.Int64 // for side effect of its 64-bit alignment - -// Server is a DERP server. -type Server struct { - // WriteTimeout, if non-zero, specifies how long to wait - // before failing when writing to a client. - WriteTimeout time.Duration - - privateKey key.NodePrivate - publicKey key.NodePublic - logf logger.Logf - memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish) - meshKey string - limitedLogf logger.Logf - metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate - dupPolicy dupPolicy - debug bool - - // Counters: - packetsSent, bytesSent expvar.Int - packetsRecv, bytesRecv expvar.Int - packetsRecvByKind metrics.LabelMap - packetsRecvDisco *expvar.Int - packetsRecvOther *expvar.Int - _ align64 - packetsDropped expvar.Int - packetsDroppedReason metrics.LabelMap - packetsDroppedReasonCounters []*expvar.Int // indexed by dropReason - packetsDroppedType metrics.LabelMap - packetsDroppedTypeDisco *expvar.Int - packetsDroppedTypeOther *expvar.Int - _ align64 - packetsForwardedOut expvar.Int - packetsForwardedIn expvar.Int - peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent - peerGoneNotHereFrames expvar.Int // number of peer not here frames sent - gotPing expvar.Int // number of ping frames from client - sentPong expvar.Int // number of pong frames enqueued to client - accepts expvar.Int - curClients expvar.Int - curClientsNotIdeal expvar.Int - curHomeClients expvar.Int // ones with preferred - dupClientKeys expvar.Int // current number of public keys we have 2+ connections for - dupClientConns expvar.Int // current number of connections sharing a public key - dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed - unknownFrames expvar.Int - homeMovesIn expvar.Int // established clients announce home server moves in - homeMovesOut expvar.Int // established clients announce home server moves out - multiForwarderCreated expvar.Int - multiForwarderDeleted expvar.Int - removePktForwardOther expvar.Int - sclientWriteTimeouts expvar.Int - avgQueueDuration *uint64 // In milliseconds; accessed atomically - tcpRtt metrics.LabelMap // histogram - meshUpdateBatchSize *metrics.Histogram - meshUpdateLoopCount *metrics.Histogram - bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush - - // verifyClientsLocalTailscaled only accepts client connections to the DERP - // server if the clientKey is a known peer in the network, as specified by a - // running tailscaled's client's LocalAPI. - verifyClientsLocalTailscaled bool - - verifyClientsURL string - verifyClientsURLFailOpen bool - - mu sync.Mutex - closed bool - netConns map[Conn]chan struct{} // chan is closed when conn closes - clients map[key.NodePublic]*clientSet - watchers set.Set[*sclient] // mesh peers - // clientsMesh tracks all clients in the cluster, both locally - // and to mesh peers. If the value is nil, that means the - // peer is only local (and thus in the clients Map, but not - // remote). If the value is non-nil, it's remote (+ maybe also - // local). - clientsMesh map[key.NodePublic]PacketForwarder - // peerGoneWatchers is the set of watchers that subscribed to a - // peer disconnecting from the region overall. When a peer - // is gone from the region, we notify all of these watchers, - // calling their funcs in a new goroutine. - peerGoneWatchers map[key.NodePublic]set.HandleSet[func(key.NodePublic)] - - // maps from netip.AddrPort to a client's public key - keyOfAddr map[netip.AddrPort]key.NodePublic - - // Sets the client send queue depth for the server. - perClientSendQueueDepth int - - clock tstime.Clock -} - -// clientSet represents 1 or more *sclients. -// -// In the common case, client should only have one connection to the -// DERP server for a given key. When they're connected multiple times, -// we record their set of connections in dupClientSet and keep their -// connections open to make them happy (to keep them from spinning, -// etc) and keep track of which is the latest connection. If only the last -// is sending traffic, that last one is the active connection and it -// gets traffic. Otherwise, in the case of a cloned node key, the -// whole set of dups doesn't receive data frames. -// -// All methods should only be called while holding Server.mu. -// -// TODO(bradfitz): Issue 2746: in the future we'll send some sort of -// "health_error" frame to them that'll communicate to the end users -// that they cloned a device key, and we'll also surface it in the -// admin panel, etc. -type clientSet struct { - // activeClient holds the currently active connection for the set. It's nil - // if there are no connections or the connection is disabled. - // - // A pointer to a clientSet can be held by peers for long periods of time - // without holding Server.mu to avoid mutex contention on Server.mu, only - // re-acquiring the mutex and checking the clients map if activeClient is - // nil. - activeClient atomic.Pointer[sclient] - - // dup is non-nil if there are multiple connections for the - // public key. It's nil in the common case of only one - // client being connected. - // - // dup is guarded by Server.mu. - dup *dupClientSet -} - -// Len returns the number of clients in s, which can be -// 0, 1 (the common case), or more (for buggy or transiently -// reconnecting clients). -func (s *clientSet) Len() int { - if s.dup != nil { - return len(s.dup.set) - } - if s.activeClient.Load() != nil { - return 1 - } - return 0 -} - -// ForeachClient calls f for each client in the set. -// -// The Server.mu must be held. -func (s *clientSet) ForeachClient(f func(*sclient)) { - if s.dup != nil { - for c := range s.dup.set { - f(c) - } - } else if c := s.activeClient.Load(); c != nil { - f(c) - } -} - -// A dupClientSet is a clientSet of more than 1 connection. -// -// This can occur in some reasonable cases (temporarily while users -// are changing networks) or in the case of a cloned key. In the -// cloned key case, both peers are speaking and the clients get -// disabled. -// -// All fields are guarded by Server.mu. -type dupClientSet struct { - // set is the set of connected clients for sclient.key, - // including the clientSet's active one. - set set.Set[*sclient] - - // last is the most recent addition to set, or nil if the most - // recent one has since disconnected and nobody else has sent - // data since. - last *sclient - - // sendHistory is a log of which members of set have sent - // frames to the derp server, with adjacent duplicates - // removed. When a member of set is removed, the same - // element(s) are removed from sendHistory. - sendHistory []*sclient -} - -func (s *clientSet) pickActiveClient() *sclient { - d := s.dup - if d == nil { - return s.activeClient.Load() - } - if d.last != nil && !d.last.isDisabled.Load() { - return d.last - } - return nil -} - -// removeClient removes c from s and reports whether it was in s -// to begin with. -func (s *dupClientSet) removeClient(c *sclient) bool { - n := len(s.set) - delete(s.set, c) - if s.last == c { - s.last = nil - } - if len(s.set) == n { - return false - } - - trim := s.sendHistory[:0] - for _, v := range s.sendHistory { - if s.set.Contains(v) && (len(trim) == 0 || trim[len(trim)-1] != v) { - trim = append(trim, v) - } - } - for i := len(trim); i < len(s.sendHistory); i++ { - s.sendHistory[i] = nil - } - s.sendHistory = trim - if s.last == nil && len(s.sendHistory) > 0 { - s.last = s.sendHistory[len(s.sendHistory)-1] - } - return true -} - -// PacketForwarder is something that can forward packets. -// -// It's mostly an interface for circular dependency reasons; the -// typical implementation is derphttp.Client. The other implementation -// is a multiForwarder, which this package creates as needed if a -// public key gets more than one PacketForwarder registered for it. -type PacketForwarder interface { - ForwardPacket(src, dst key.NodePublic, payload []byte) error - String() string -} - -// Conn is the subset of the underlying net.Conn the DERP Server needs. -// It is a defined type so that non-net connections can be used. -type Conn interface { - io.WriteCloser - LocalAddr() net.Addr - // The *Deadline methods follow the semantics of net.Conn. - SetDeadline(time.Time) error - SetReadDeadline(time.Time) error - SetWriteDeadline(time.Time) error -} - -// NewServer returns a new DERP server. It doesn't listen on its own. -// Connections are given to it via Server.Accept. -func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { - var ms runtime.MemStats - runtime.ReadMemStats(&ms) - - s := &Server{ - debug: envknob.Bool("DERP_DEBUG_LOGS"), - privateKey: privateKey, - publicKey: privateKey.Public(), - logf: logf, - limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100), - packetsRecvByKind: metrics.LabelMap{Label: "kind"}, - packetsDroppedReason: metrics.LabelMap{Label: "reason"}, - packetsDroppedType: metrics.LabelMap{Label: "type"}, - clients: map[key.NodePublic]*clientSet{}, - clientsMesh: map[key.NodePublic]PacketForwarder{}, - netConns: map[Conn]chan struct{}{}, - memSys0: ms.Sys, - watchers: set.Set[*sclient]{}, - peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{}, - avgQueueDuration: new(uint64), - tcpRtt: metrics.LabelMap{Label: "le"}, - meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}), - meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}), - bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}), - keyOfAddr: map[netip.AddrPort]key.NodePublic{}, - clock: tstime.StdClock{}, - } - s.initMetacert() - s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") - s.packetsRecvOther = s.packetsRecvByKind.Get("other") - - s.packetsDroppedReasonCounters = s.genPacketsDroppedReasonCounters() - - s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco") - s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other") - - s.perClientSendQueueDepth = getPerClientSendQueueDepth() - return s -} - -func (s *Server) genPacketsDroppedReasonCounters() []*expvar.Int { - getMetric := s.packetsDroppedReason.Get - ret := []*expvar.Int{ - dropReasonUnknownDest: getMetric("unknown_dest"), - dropReasonUnknownDestOnFwd: getMetric("unknown_dest_on_fwd"), - dropReasonGoneDisconnected: getMetric("gone_disconnected"), - dropReasonQueueHead: getMetric("queue_head"), - dropReasonQueueTail: getMetric("queue_tail"), - dropReasonWriteError: getMetric("write_error"), - dropReasonDupClient: getMetric("dup_client"), - } - if len(ret) != int(numDropReasons) { - panic("dropReason metrics out of sync") - } - for i := range numDropReasons { - if ret[i] == nil { - panic("dropReason metrics out of sync") - } - } - return ret -} - -// SetMesh sets the pre-shared key that regional DERP servers used to mesh -// amongst themselves. -// -// It must be called before serving begins. -func (s *Server) SetMeshKey(v string) { - s.meshKey = v -} - -// SetVerifyClients sets whether this DERP server verifies clients through tailscaled. -// -// It must be called before serving begins. -func (s *Server) SetVerifyClient(v bool) { - s.verifyClientsLocalTailscaled = v -} - -// SetVerifyClientURL sets the admission controller URL to use for verifying clients. -// If empty, all clients are accepted (unless restricted by SetVerifyClient checking -// against tailscaled). -func (s *Server) SetVerifyClientURL(v string) { - s.verifyClientsURL = v -} - -// SetVerifyClientURLFailOpen sets whether to allow clients to connect if the -// admission controller URL is unreachable. -func (s *Server) SetVerifyClientURLFailOpen(v bool) { - s.verifyClientsURLFailOpen = v -} - -// HasMeshKey reports whether the server is configured with a mesh key. -func (s *Server) HasMeshKey() bool { return s.meshKey != "" } - -// MeshKey returns the configured mesh key, if any. -func (s *Server) MeshKey() string { return s.meshKey } - -// PrivateKey returns the server's private key. -func (s *Server) PrivateKey() key.NodePrivate { return s.privateKey } - -// PublicKey returns the server's public key. -func (s *Server) PublicKey() key.NodePublic { return s.publicKey } - -// Close closes the server and waits for the connections to disconnect. -func (s *Server) Close() error { - s.mu.Lock() - wasClosed := s.closed - s.closed = true - s.mu.Unlock() - if wasClosed { - return nil - } - - var closedChs []chan struct{} - - s.mu.Lock() - for nc, closed := range s.netConns { - nc.Close() - closedChs = append(closedChs, closed) - } - s.mu.Unlock() - - for _, closed := range closedChs { - <-closed - } - - return nil -} - -func (s *Server) isClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.closed -} - -// IsClientConnectedForTest reports whether the client with specified key is connected. -// This is used in tests to verify that nodes are connected. -func (s *Server) IsClientConnectedForTest(k key.NodePublic) bool { - s.mu.Lock() - defer s.mu.Unlock() - x, ok := s.clients[k] - if !ok { - return false - } - return x.activeClient.Load() != nil -} - -// Accept adds a new connection to the server and serves it. -// -// The provided bufio ReadWriter must be already connected to nc. -// Accept blocks until the Server is closed or the connection closes -// on its own. -// -// Accept closes nc. -func (s *Server) Accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string) { - closed := make(chan struct{}) - - s.mu.Lock() - s.accepts.Add(1) // while holding s.mu for connNum read on next line - connNum := s.accepts.Value() // expvar sadly doesn't return new value on Add(1) - s.netConns[nc] = closed - s.mu.Unlock() - - defer func() { - nc.Close() - close(closed) - - s.mu.Lock() - delete(s.netConns, nc) - s.mu.Unlock() - }() - - if err := s.accept(ctx, nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() { - s.logf("derp: %s: %v", remoteAddr, err) - } -} - -// initMetacert initialized s.metaCert with a self-signed x509 cert -// encoding this server's public key and protocol version. cmd/derper -// then sends this after the Let's Encrypt leaf + intermediate certs -// after the ServerHello (encrypted in TLS 1.3, not that it matters -// much). -// -// Then the client can save a round trip getting that and can start -// speaking DERP right away. (We don't use ALPN because that's sent in -// the clear and we're being paranoid to not look too weird to any -// middleboxes, given that DERP is an ultimate fallback path). But -// since the post-ServerHello certs are encrypted we can have the -// client also use them as a signal to be able to start speaking DERP -// right away, starting with its identity proof, encrypted to the -// server's public key. -// -// This RTT optimization fails where there's a corp-mandated -// TLS proxy with corp-mandated root certs on employee machines and -// and TLS proxy cleans up unnecessary certs. In that case we just fall -// back to the extra RTT. -func (s *Server) initMetacert() { - pub, priv, err := ed25519.GenerateKey(crand.Reader) - if err != nil { - log.Fatal(err) - } - tmpl := &x509.Certificate{ - SerialNumber: big.NewInt(ProtocolVersion), - Subject: pkix.Name{ - CommonName: fmt.Sprintf("derpkey%s", s.publicKey.UntypedHexString()), - }, - // Windows requires NotAfter and NotBefore set: - NotAfter: s.clock.Now().Add(30 * 24 * time.Hour), - NotBefore: s.clock.Now().Add(-30 * 24 * time.Hour), - // Per https://github.com/golang/go/issues/51759#issuecomment-1071147836, - // macOS requires BasicConstraints when subject == issuer: - BasicConstraintsValid: true, - } - cert, err := x509.CreateCertificate(crand.Reader, tmpl, tmpl, pub, priv) - if err != nil { - log.Fatalf("CreateCertificate: %v", err) - } - s.metaCert = cert -} - -// MetaCert returns the server metadata cert that can be sent by the -// TLS server to let the client skip a round trip during start-up. -func (s *Server) MetaCert() []byte { return s.metaCert } - -// registerClient notes that client c is now authenticated and ready for packets. -// -// If c.key is connected more than once, the earlier connection(s) are -// placed in a non-active state where we read from them (primarily to -// observe EOFs/timeouts) but won't send them frames on the assumption -// that they're dead. -func (s *Server) registerClient(c *sclient) { - s.mu.Lock() - defer s.mu.Unlock() - - cs, ok := s.clients[c.key] - if !ok { - c.debugLogf("register single client") - cs = &clientSet{} - s.clients[c.key] = cs - } - was := cs.activeClient.Load() - if was == nil { - // Common case. - } else { - was.isDup.Store(true) - c.isDup.Store(true) - } - - dup := cs.dup - if dup == nil && was != nil { - s.dupClientKeys.Add(1) - s.dupClientConns.Add(2) // both old and new count - s.dupClientConnTotal.Add(1) - dup = &dupClientSet{ - set: set.Of(c, was), - last: c, - sendHistory: []*sclient{was}, - } - cs.dup = dup - c.debugLogf("register duplicate client") - } else if dup != nil { - s.dupClientConns.Add(1) // the gauge - s.dupClientConnTotal.Add(1) // the counter - dup.set.Add(c) - dup.last = c - dup.sendHistory = append(dup.sendHistory, c) - c.debugLogf("register another duplicate client") - } - - cs.activeClient.Store(c) - - if _, ok := s.clientsMesh[c.key]; !ok { - s.clientsMesh[c.key] = nil // just for varz of total users in cluster - } - s.keyOfAddr[c.remoteIPPort] = c.key - s.curClients.Add(1) - if c.isNotIdealConn { - s.curClientsNotIdeal.Add(1) - } - s.broadcastPeerStateChangeLocked(c.key, c.remoteIPPort, c.presentFlags(), true) -} - -// broadcastPeerStateChangeLocked enqueues a message to all watchers -// (other DERP nodes in the region, or trusted clients) that peer's -// presence changed. -// -// s.mu must be held. -func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, ipPort netip.AddrPort, flags PeerPresentFlags, present bool) { - for w := range s.watchers { - w.peerStateChange = append(w.peerStateChange, peerConnState{ - peer: peer, - present: present, - ipPort: ipPort, - flags: flags, - }) - go w.requestMeshUpdate() - } -} - -// unregisterClient removes a client from the server. -func (s *Server) unregisterClient(c *sclient) { - s.mu.Lock() - defer s.mu.Unlock() - - set, ok := s.clients[c.key] - if !ok { - c.logf("[unexpected]; clients map is empty") - return - } - - dup := set.dup - if dup == nil { - // The common case. - cur := set.activeClient.Load() - if cur == nil { - c.logf("[unexpected]; active client is nil") - return - } - if cur != c { - c.logf("[unexpected]; active client is not c") - return - } - c.debugLogf("removed connection") - set.activeClient.Store(nil) - delete(s.clients, c.key) - if v, ok := s.clientsMesh[c.key]; ok && v == nil { - delete(s.clientsMesh, c.key) - s.notePeerGoneFromRegionLocked(c.key) - } - s.broadcastPeerStateChangeLocked(c.key, netip.AddrPort{}, 0, false) - } else { - c.debugLogf("removed duplicate client") - if dup.removeClient(c) { - s.dupClientConns.Add(-1) - } else { - c.logf("[unexpected]; dup client set didn't shrink") - } - if dup.set.Len() == 1 { - // If we drop down to one connection, demote it down - // to a regular single client (a nil dup set). - set.dup = nil - s.dupClientConns.Add(-1) // again; for the original one's - s.dupClientKeys.Add(-1) - var remain *sclient - for remain = range dup.set { - break - } - if remain == nil { - panic("unexpected nil remain from single element dup set") - } - remain.isDisabled.Store(false) - remain.isDup.Store(false) - set.activeClient.Store(remain) - } else { - // Still a duplicate. Pick a winner. - set.activeClient.Store(set.pickActiveClient()) - } - } - - if c.canMesh { - delete(s.watchers, c) - } - - delete(s.keyOfAddr, c.remoteIPPort) - - s.curClients.Add(-1) - if c.preferred { - s.curHomeClients.Add(-1) - } - if c.isNotIdealConn { - s.curClientsNotIdeal.Add(-1) - } -} - -// addPeerGoneFromRegionWatcher adds a function to be called when peer is gone -// from the region overall. It returns a handle that can be used to remove the -// watcher later. -// -// The provided f func is usually [sclient.onPeerGoneFromRegion], added by -// [sclient.noteSendFromSrc]; this func doesn't take a whole *sclient to make it -// clear what has access to what. -func (s *Server) addPeerGoneFromRegionWatcher(peer key.NodePublic, f func(key.NodePublic)) set.Handle { - s.mu.Lock() - defer s.mu.Unlock() - hset, ok := s.peerGoneWatchers[peer] - if !ok { - hset = set.HandleSet[func(key.NodePublic)]{} - s.peerGoneWatchers[peer] = hset - } - return hset.Add(f) -} - -// removePeerGoneFromRegionWatcher removes a peer watcher previously added by -// addPeerGoneFromRegionWatcher, using the handle returned by -// addPeerGoneFromRegionWatcher. -func (s *Server) removePeerGoneFromRegionWatcher(peer key.NodePublic, h set.Handle) { - s.mu.Lock() - defer s.mu.Unlock() - hset, ok := s.peerGoneWatchers[peer] - if !ok { - return - } - delete(hset, h) - if len(hset) == 0 { - delete(s.peerGoneWatchers, peer) - } -} - -// notePeerGoneFromRegionLocked sends peerGone frames to parties that -// key has sent to previously (whether those sends were from a local -// client or forwarded). It must only be called after the key has -// been removed from clientsMesh. -func (s *Server) notePeerGoneFromRegionLocked(key key.NodePublic) { - if _, ok := s.clientsMesh[key]; ok { - panic("usage") - } - - // Find still-connected peers and either notify that we've gone away - // so they can drop their route entries to us (issue 150) - // or move them over to the active client (in case a replaced client - // connection is being unregistered). - set := s.peerGoneWatchers[key] - for _, f := range set { - go f(key) - } - delete(s.peerGoneWatchers, key) -} - -// requestPeerGoneWriteLimited sends a request to write a "peer gone" -// frame, but only in reply to a disco packet, and only if we haven't -// sent one recently. -func (c *sclient) requestPeerGoneWriteLimited(peer key.NodePublic, contents []byte, reason PeerGoneReasonType) { - if disco.LooksLikeDiscoWrapper(contents) != true { - return - } - - if c.peerGoneLim.Allow() { - go c.requestPeerGoneWrite(peer, reason) - } -} - -func (s *Server) addWatcher(c *sclient) { - if !c.canMesh { - panic("invariant: addWatcher called without permissions") - } - - if c.key == s.publicKey { - // We're connecting to ourself. Do nothing. - return - } - - s.mu.Lock() - defer s.mu.Unlock() - - // Queue messages for each already-connected client. - for peer, clientSet := range s.clients { - ac := clientSet.activeClient.Load() - if ac == nil { - continue - } - c.peerStateChange = append(c.peerStateChange, peerConnState{ - peer: peer, - present: true, - ipPort: ac.remoteIPPort, - flags: ac.presentFlags(), - }) - } - - // And enroll the watcher in future updates (of both - // connections & disconnections). - s.watchers.Add(c) - - go c.requestMeshUpdate() -} - -func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error { - br := brw.Reader - nc.SetDeadline(time.Now().Add(10 * time.Second)) - bw := &lazyBufioWriter{w: nc, lbw: brw.Writer} - if err := s.sendServerKey(bw); err != nil { - return fmt.Errorf("send server key: %v", err) - } - nc.SetDeadline(time.Now().Add(10 * time.Second)) - clientKey, clientInfo, err := s.recvClientKey(br) - if err != nil { - return fmt.Errorf("receive client key: %v", err) - } - - remoteIPPort, _ := netip.ParseAddrPort(remoteAddr) - if err := s.verifyClient(ctx, clientKey, clientInfo, remoteIPPort.Addr()); err != nil { - return fmt.Errorf("client %v rejected: %v", clientKey, err) - } - - // At this point we trust the client so we don't time out. - nc.SetDeadline(time.Time{}) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - c := &sclient{ - connNum: connNum, - s: s, - key: clientKey, - nc: nc, - br: br, - bw: bw, - logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v%s: ", remoteAddr, clientKey.ShortString())), - done: ctx.Done(), - remoteIPPort: remoteIPPort, - connectedAt: s.clock.Now(), - sendQueue: make(chan pkt, s.perClientSendQueueDepth), - discoSendQueue: make(chan pkt, s.perClientSendQueueDepth), - sendPongCh: make(chan [8]byte, 1), - peerGone: make(chan peerGoneMsg), - canMesh: s.isMeshPeer(clientInfo), - isNotIdealConn: IdealNodeContextKey.Value(ctx) != "", - peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3), - } - - if c.canMesh { - c.meshUpdate = make(chan struct{}, 1) // must be buffered; >1 is fine but wasteful - } - if clientInfo != nil { - c.info = *clientInfo - if envknob.Bool("DERP_PROBER_DEBUG_LOGS") && clientInfo.IsProber { - c.debug = true - } - } - if s.debug { - c.debug = true - } - - s.registerClient(c) - defer s.unregisterClient(c) - - err = s.sendServerInfo(c.bw, clientKey) - if err != nil { - return fmt.Errorf("send server info: %v", err) - } - - return c.run(ctx) -} - -func (s *Server) debugLogf(format string, v ...any) { - if s.debug { - s.logf(format, v...) - } -} - -// run serves the client until there's an error. -// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error. -func (c *sclient) run(ctx context.Context) error { - // Launch sender, but don't return from run until sender goroutine is done. - var grp errgroup.Group - sendCtx, cancelSender := context.WithCancel(ctx) - grp.Go(func() error { return c.sendLoop(sendCtx) }) - defer func() { - cancelSender() - if err := grp.Wait(); err != nil && !c.s.isClosed() { - if errors.Is(err, context.Canceled) { - c.debugLogf("sender canceled by reader exiting") - } else { - if errors.Is(err, os.ErrDeadlineExceeded) { - c.s.sclientWriteTimeouts.Add(1) - } - c.logf("sender failed: %v", err) - } - } - }() - - c.startStatsLoop(sendCtx) - - for { - ft, fl, err := readFrameHeader(c.br) - c.debugLogf("read frame type %d len %d err %v", ft, fl, err) - if err != nil { - if errors.Is(err, io.EOF) { - c.debugLogf("read EOF") - return nil - } - if c.s.isClosed() { - c.logf("closing; server closed") - return nil - } - return fmt.Errorf("client %s: readFrameHeader: %w", c.key.ShortString(), err) - } - c.s.noteClientActivity(c) - switch ft { - case frameNotePreferred: - err = c.handleFrameNotePreferred(ft, fl) - case frameSendPacket: - err = c.handleFrameSendPacket(ft, fl) - case frameForwardPacket: - err = c.handleFrameForwardPacket(ft, fl) - case frameWatchConns: - err = c.handleFrameWatchConns(ft, fl) - case frameClosePeer: - err = c.handleFrameClosePeer(ft, fl) - case framePing: - err = c.handleFramePing(ft, fl) - default: - err = c.handleUnknownFrame(ft, fl) - } - if err != nil { - return err - } - } -} - -func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error { - _, err := io.CopyN(io.Discard, c.br, int64(fl)) - return err -} - -func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error { - if fl != 1 { - return fmt.Errorf("frameNotePreferred wrong size") - } - v, err := c.br.ReadByte() - if err != nil { - return fmt.Errorf("frameNotePreferred ReadByte: %v", err) - } - c.setPreferred(v != 0) - return nil -} - -func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error { - if fl != 0 { - return fmt.Errorf("handleFrameWatchConns wrong size") - } - if !c.canMesh { - return fmt.Errorf("insufficient permissions") - } - c.s.addWatcher(c) - return nil -} - -func (c *sclient) handleFramePing(ft frameType, fl uint32) error { - c.s.gotPing.Add(1) - var m PingMessage - if fl < uint32(len(m)) { - return fmt.Errorf("short ping: %v", fl) - } - if fl > 1000 { - // unreasonably extra large. We leave some extra - // space for future extensibility, but not too much. - return fmt.Errorf("ping body too large: %v", fl) - } - _, err := io.ReadFull(c.br, m[:]) - if err != nil { - return err - } - if extra := int64(fl) - int64(len(m)); extra > 0 { - _, err = io.CopyN(io.Discard, c.br, extra) - } - select { - case c.sendPongCh <- [8]byte(m): - default: - // They're pinging too fast. Ignore. - // TODO(bradfitz): add a rate limiter too. - } - return err -} - -func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error { - if fl != keyLen { - return fmt.Errorf("handleFrameClosePeer wrong size") - } - if !c.canMesh { - return fmt.Errorf("insufficient permissions") - } - var targetKey key.NodePublic - if err := targetKey.ReadRawWithoutAllocating(c.br); err != nil { - return err - } - s := c.s - - s.mu.Lock() - defer s.mu.Unlock() - - if set, ok := s.clients[targetKey]; ok { - if set.Len() == 1 { - c.logf("frameClosePeer closing peer %x", targetKey) - } else { - c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len()) - } - set.ForeachClient(func(target *sclient) { - go target.nc.Close() - }) - } else { - c.logf("frameClosePeer failed to find peer %x", targetKey) - } - - return nil -} - -// handleFrameForwardPacket reads a "forward packet" frame from the client -// (which must be a trusted client, a peer in our mesh). -func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { - if !c.canMesh { - return fmt.Errorf("insufficient permissions") - } - s := c.s - - srcKey, dstKey, contents, err := s.recvForwardPacket(c.br, fl) - if err != nil { - return fmt.Errorf("client %v: recvForwardPacket: %v", c.key, err) - } - s.packetsForwardedIn.Add(1) - - var dstLen int - var dst *sclient - - s.mu.Lock() - if set, ok := s.clients[dstKey]; ok { - dstLen = set.Len() - dst = set.activeClient.Load() - } - s.mu.Unlock() - - if dst == nil { - reason := dropReasonUnknownDestOnFwd - if dstLen > 1 { - reason = dropReasonDupClient - } else { - c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) - } - s.recordDrop(contents, srcKey, dstKey, reason) - return nil - } - - dst.debugLogf("received forwarded packet from %s via %s", srcKey.ShortString(), c.key.ShortString()) - - return c.sendPkt(dst, pkt{ - bs: contents, - enqueuedAt: c.s.clock.Now(), - src: srcKey, - }) -} - -// handleFrameSendPacket reads a "send packet" frame from the client. -func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { - s := c.s - - dstKey, contents, err := s.recvPacket(c.br, fl) - if err != nil { - return fmt.Errorf("client %v: recvPacket: %v", c.key, err) - } - - var fwd PacketForwarder - var dstLen int - var dst *sclient - - s.mu.Lock() - if set, ok := s.clients[dstKey]; ok { - dstLen = set.Len() - dst = set.activeClient.Load() - } - if dst == nil && dstLen < 1 { - fwd = s.clientsMesh[dstKey] - } - s.mu.Unlock() - - if dst == nil { - if fwd != nil { - s.packetsForwardedOut.Add(1) - err := fwd.ForwardPacket(c.key, dstKey, contents) - c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err) - if err != nil { - // TODO: - return nil - } - return nil - } - reason := dropReasonUnknownDest - if dstLen > 1 { - reason = dropReasonDupClient - } else { - c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) - } - s.recordDrop(contents, c.key, dstKey, reason) - c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) - return nil - } - c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString()) - - p := pkt{ - bs: contents, - enqueuedAt: c.s.clock.Now(), - src: c.key, - } - return c.sendPkt(dst, p) -} - -func (c *sclient) debugLogf(format string, v ...any) { - if c.debug { - c.logf(format, v...) - } -} - -// dropReason is why we dropped a DERP frame. -type dropReason int - -//go:generate go run tailscale.com/cmd/addlicense -file dropreason_string.go go run golang.org/x/tools/cmd/stringer -type=dropReason -trimprefix=dropReason - -const ( - dropReasonUnknownDest dropReason = iota // unknown destination pubkey - dropReasonUnknownDestOnFwd // unknown destination pubkey on a derp-forwarded packet - dropReasonGoneDisconnected // destination tailscaled disconnected before we could send - dropReasonQueueHead // destination queue is full, dropped packet at queue head - dropReasonQueueTail // destination queue is full, dropped packet at queue tail - dropReasonWriteError // OS write() failed - dropReasonDupClient // the public key is connected 2+ times (active/active, fighting) - numDropReasons // unused; keep last -) - -func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) { - s.packetsDropped.Add(1) - s.packetsDroppedReasonCounters[reason].Add(1) - looksDisco := disco.LooksLikeDiscoWrapper(packetBytes) - if looksDisco { - s.packetsDroppedTypeDisco.Add(1) - } else { - s.packetsDroppedTypeOther.Add(1) - } - if verboseDropKeys[dstKey] { - // Preformat the log string prior to calling limitedLogf. The - // limiter acts based on the format string, and we want to - // rate-limit per src/dst keys, not on the generic "dropped - // stuff" message. - msg := fmt.Sprintf("drop (%s) %s -> %s", srcKey.ShortString(), reason, dstKey.ShortString()) - s.limitedLogf(msg) - } - s.debugLogf("dropping packet reason=%s dst=%s disco=%v", reason, dstKey, looksDisco) -} - -func (c *sclient) sendPkt(dst *sclient, p pkt) error { - s := c.s - dstKey := dst.key - - // Attempt to queue for sending up to 3 times. On each attempt, if - // the queue is full, try to drop from queue head to prioritize - // fresher packets. - sendQueue := dst.sendQueue - if disco.LooksLikeDiscoWrapper(p.bs) { - sendQueue = dst.discoSendQueue - } - for attempt := 0; attempt < 3; attempt++ { - select { - case <-dst.done: - s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected) - dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt) - return nil - default: - } - select { - case sendQueue <- p: - dst.debugLogf("sendPkt attempt %d enqueued", attempt) - return nil - default: - } - - select { - case pkt := <-sendQueue: - s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead) - c.recordQueueTime(pkt.enqueuedAt) - default: - } - } - // Failed to make room for packet. This can happen in a heavily - // contended queue with racing writers. Give up and tail-drop in - // this case to keep reader unblocked. - s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail) - dst.debugLogf("sendPkt attempt %d dropped, queue full") - - return nil -} - -// onPeerGoneFromRegion is the callback registered with the Server to be -// notified (in a new goroutine) whenever a peer has disconnected from all DERP -// nodes in the current region. -func (c *sclient) onPeerGoneFromRegion(peer key.NodePublic) { - c.requestPeerGoneWrite(peer, PeerGoneReasonDisconnected) -} - -// requestPeerGoneWrite sends a request to write a "peer gone" frame -// with an explanation of why it is gone. It blocks until either the -// write request is scheduled, or the client has closed. -func (c *sclient) requestPeerGoneWrite(peer key.NodePublic, reason PeerGoneReasonType) { - select { - case c.peerGone <- peerGoneMsg{ - peer: peer, - reason: reason, - }: - case <-c.done: - } -} - -// requestMeshUpdate notes that a c's peerStateChange has been appended to and -// should now be written. -// -// It does not block. If a meshUpdate is already pending for this client, it -// does nothing. -func (c *sclient) requestMeshUpdate() { - if !c.canMesh { - panic("unexpected requestMeshUpdate") - } - select { - case c.meshUpdate <- struct{}{}: - default: - } -} - -// isMeshPeer reports whether the client is a trusted mesh peer -// node in the DERP region. -func (s *Server) isMeshPeer(info *clientInfo) bool { - return info != nil && info.MeshKey != "" && info.MeshKey == s.meshKey -} - -// verifyClient checks whether the client is allowed to connect to the derper, -// depending on how & whether the server's been configured to verify. -func (s *Server) verifyClient(ctx context.Context, clientKey key.NodePublic, info *clientInfo, clientIP netip.Addr) error { - if s.isMeshPeer(info) { - // Trusted mesh peer. No need to verify further. In fact, verifying - // further wouldn't work: it's not part of the tailnet so tailscaled and - // likely the admission control URL wouldn't know about it. - return nil - } - - // tailscaled-based verification: - if s.verifyClientsLocalTailscaled { - return errors.New("lanscaping") - } - - // admission controller-based verification: - if s.verifyClientsURL != "" { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - jreq, err := json.Marshal(&tailcfg.DERPAdmitClientRequest{ - NodePublic: clientKey, - Source: clientIP, - }) - if err != nil { - return err - } - req, err := http.NewRequestWithContext(ctx, "POST", s.verifyClientsURL, bytes.NewReader(jreq)) - if err != nil { - return err - } - res, err := http.DefaultClient.Do(req) - if err != nil { - if s.verifyClientsURLFailOpen { - s.logf("admission controller unreachable; allowing client %v", clientKey) - return nil - } - return err - } - defer res.Body.Close() - if res.StatusCode != 200 { - return fmt.Errorf("admission controller: %v", res.Status) - } - var jres tailcfg.DERPAdmitClientResponse - if err := json.NewDecoder(io.LimitReader(res.Body, 4<<10)).Decode(&jres); err != nil { - return err - } - if !jres.Allow { - return fmt.Errorf("admission controller: %v/%v not allowed", clientKey, clientIP) - } - // TODO(bradfitz): add policy for configurable bandwidth rate per client? - } - return nil -} - -func (s *Server) sendServerKey(lw *lazyBufioWriter) error { - buf := make([]byte, 0, len(magic)+key.NodePublicRawLen) - buf = append(buf, magic...) - buf = s.publicKey.AppendTo(buf) - err := writeFrame(lw.bw(), frameServerKey, buf) - lw.Flush() // redundant (no-op) flush to release bufio.Writer - return err -} - -func (s *Server) noteClientActivity(c *sclient) { - if !c.isDup.Load() { - // Fast path for clients that aren't in a dup set. - return - } - if c.isDisabled.Load() { - // If they're already disabled, no point checking more. - return - } - s.mu.Lock() - defer s.mu.Unlock() - - cs, ok := s.clients[c.key] - if !ok { - return - } - dup := cs.dup - if dup == nil { - // It became unduped in between the isDup fast path check above - // and the mutex check. Nothing to do. - return - } - - if s.dupPolicy == lastWriterIsActive { - dup.last = c - cs.activeClient.Store(c) - } else if dup.last == nil { - // If we didn't have a primary, let the current - // speaker be the primary. - dup.last = c - cs.activeClient.Store(c) - } - - if slicesx.LastEqual(dup.sendHistory, c) { - // The client c was the last client to make activity - // in this set and it was already recorded. Nothing to - // do. - return - } - - // If we saw this connection send previously, then consider - // the group fighting and disable them all. - if s.dupPolicy == disableFighters { - for _, prior := range dup.sendHistory { - if prior == c { - cs.ForeachClient(func(c *sclient) { - c.isDisabled.Store(true) - if cs.activeClient.Load() == c { - cs.activeClient.Store(nil) - } - }) - break - } - } - } - - // Append this client to the list of clients who spoke last. - dup.sendHistory = append(dup.sendHistory, c) -} - -type serverInfo struct { - Version int `json:"version,omitempty"` - - TokenBucketBytesPerSecond int `json:",omitempty"` - TokenBucketBytesBurst int `json:",omitempty"` -} - -func (s *Server) sendServerInfo(bw *lazyBufioWriter, clientKey key.NodePublic) error { - msg, err := json.Marshal(serverInfo{Version: ProtocolVersion}) - if err != nil { - return err - } - - msgbox := s.privateKey.SealTo(clientKey, msg) - if err := writeFrameHeader(bw.bw(), frameServerInfo, uint32(len(msgbox))); err != nil { - return err - } - if _, err := bw.Write(msgbox); err != nil { - return err - } - return bw.Flush() -} - -// recvClientKey reads the frameClientInfo frame from the client (its -// proof of identity) upon its initial connection. It should be -// considered especially untrusted at this point. -func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info *clientInfo, err error) { - fl, err := readFrameTypeHeader(br, frameClientInfo) - if err != nil { - return zpub, nil, err - } - const minLen = keyLen + nonceLen - if fl < minLen { - return zpub, nil, errors.New("short client info") - } - // We don't trust the client at all yet, so limit its input size to limit - // things like JSON resource exhausting (http://github.com/golang/go/issues/31789). - if fl > 256<<10 { - return zpub, nil, errors.New("long client info") - } - if err := clientKey.ReadRawWithoutAllocating(br); err != nil { - return zpub, nil, err - } - msgLen := int(fl - keyLen) - msgbox := make([]byte, msgLen) - if _, err := io.ReadFull(br, msgbox); err != nil { - return zpub, nil, fmt.Errorf("msgbox: %v", err) - } - msg, ok := s.privateKey.OpenFrom(clientKey, msgbox) - if !ok { - return zpub, nil, fmt.Errorf("msgbox: cannot open len=%d with client key %s", msgLen, clientKey) - } - info = new(clientInfo) - if err := json.Unmarshal(msg, info); err != nil { - return zpub, nil, fmt.Errorf("msg: %v", err) - } - return clientKey, info, nil -} - -func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) { - if frameLen < keyLen { - return zpub, nil, errors.New("short send packet frame") - } - if err := dstKey.ReadRawWithoutAllocating(br); err != nil { - return zpub, nil, err - } - packetLen := frameLen - keyLen - if packetLen > MaxPacketSize { - return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize) - } - contents = make([]byte, packetLen) - if _, err := io.ReadFull(br, contents); err != nil { - return zpub, nil, err - } - s.packetsRecv.Add(1) - s.bytesRecv.Add(int64(len(contents))) - if disco.LooksLikeDiscoWrapper(contents) { - s.packetsRecvDisco.Add(1) - } else { - s.packetsRecvOther.Add(1) - } - return dstKey, contents, nil -} - -// zpub is the key.NodePublic zero value. -var zpub key.NodePublic - -func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, dstKey key.NodePublic, contents []byte, err error) { - if frameLen < keyLen*2 { - return zpub, zpub, nil, errors.New("short send packet frame") - } - if err := srcKey.ReadRawWithoutAllocating(br); err != nil { - return zpub, zpub, nil, err - } - if err := dstKey.ReadRawWithoutAllocating(br); err != nil { - return zpub, zpub, nil, err - } - packetLen := frameLen - keyLen*2 - if packetLen > MaxPacketSize { - return zpub, zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize) - } - contents = make([]byte, packetLen) - if _, err := io.ReadFull(br, contents); err != nil { - return zpub, zpub, nil, err - } - // TODO: was s.packetsRecv.Add(1) - // TODO: was s.bytesRecv.Add(int64(len(contents))) - return srcKey, dstKey, contents, nil -} - -// sclient is a client connection to the server. -// -// A node (a wireguard public key) can be connected multiple times to a DERP server -// and thus have multiple sclient instances. An sclient represents -// only one of these possibly multiple connections. See clientSet for the -// type that represents the set of all connections for a given key. -// -// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) -type sclient struct { - // Static after construction. - connNum int64 // process-wide unique counter, incremented each Accept - s *Server - nc Conn - key key.NodePublic - info clientInfo - logf logger.Logf - done <-chan struct{} // closed when connection closes - remoteIPPort netip.AddrPort // zero if remoteAddr is not ip:port. - sendQueue chan pkt // packets queued to this client; never closed - discoSendQueue chan pkt // important packets queued to this client; never closed - sendPongCh chan [8]byte // pong replies to send to the client; never closed - peerGone chan peerGoneMsg // write request that a peer is not at this server (not used by mesh peers) - meshUpdate chan struct{} // write request to write peerStateChange - canMesh bool // clientInfo had correct mesh token for inter-region routing - isNotIdealConn bool // client indicated it is not its ideal node in the region - isDup atomic.Bool // whether more than 1 sclient for key is connected - isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups - debug bool // turn on for verbose logging - - // Owned by run, not thread-safe. - br *bufio.Reader - connectedAt time.Time - preferred bool - - // Owned by sendLoop, not thread-safe. - sawSrc map[key.NodePublic]set.Handle - bw *lazyBufioWriter - - // Guarded by s.mu - // - // peerStateChange is used by mesh peers (a set of regional - // DERP servers) and contains records that need to be sent to - // the client for them to update their map of who's connected - // to this node. - peerStateChange []peerConnState - - // peerGoneLimiter limits how often the server will inform a - // client that it's trying to establish a direct connection - // through us with a peer we have no record of. - peerGoneLim *rate.Limiter -} - -func (c *sclient) presentFlags() PeerPresentFlags { - var f PeerPresentFlags - if c.info.IsProber { - f |= PeerPresentIsProber - } - if c.canMesh { - f |= PeerPresentIsMeshPeer - } - if c.isNotIdealConn { - f |= PeerPresentNotIdeal - } - if f == 0 { - return PeerPresentIsRegular - } - return f -} - -// peerConnState represents whether a peer is connected to the server -// or not. -type peerConnState struct { - ipPort netip.AddrPort // if present, the peer's IP:port - peer key.NodePublic - flags PeerPresentFlags - present bool -} - -// pkt is a request to write a data frame to an sclient. -type pkt struct { - // enqueuedAt is when a packet was put onto a queue before it was sent, - // and is used for reporting metrics on the duration of packets in the queue. - enqueuedAt time.Time - - // bs is the data packet bytes. - // The memory is owned by pkt. - bs []byte - - // src is the who's the sender of the packet. - src key.NodePublic -} - -// peerGoneMsg is a request to write a peerGone frame to an sclient -type peerGoneMsg struct { - peer key.NodePublic - reason PeerGoneReasonType -} - -func (c *sclient) setPreferred(v bool) { - if c.preferred == v { - return - } - c.preferred = v - var homeMove *expvar.Int - if v { - c.s.curHomeClients.Add(1) - homeMove = &c.s.homeMovesIn - } else { - c.s.curHomeClients.Add(-1) - homeMove = &c.s.homeMovesOut - } - - // Keep track of varz for home serve moves in/out. But ignore - // the initial packet set when a client connects, which we - // assume happens within 5 seconds. In any case, just for - // graphs, so not important to miss a move. But it shouldn't: - // the netcheck/re-STUNs in magicsock only happen about every - // 30 seconds. - if c.s.clock.Since(c.connectedAt) > 5*time.Second { - homeMove.Add(1) - } -} - -// expMovingAverage returns the new moving average given the previous average, -// a new value, and an alpha decay factor. -// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average -func expMovingAverage(prev, newValue, alpha float64) float64 { - return alpha*newValue + (1-alpha)*prev -} - -// recordQueueTime updates the average queue duration metric after a packet has been sent. -func (c *sclient) recordQueueTime(enqueuedAt time.Time) { - elapsed := float64(c.s.clock.Since(enqueuedAt).Milliseconds()) - for { - old := atomic.LoadUint64(c.s.avgQueueDuration) - newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1) - if atomic.CompareAndSwapUint64(c.s.avgQueueDuration, old, math.Float64bits(newAvg)) { - break - } - } -} - -// onSendLoopDone is called when the send loop is done -// to clean up. -// -// It must only be called from the sendLoop goroutine. -func (c *sclient) onSendLoopDone() { - // If the sender shuts down unilaterally due to an error, close so - // that the receive loop unblocks and cleans up the rest. - c.nc.Close() - - // Clean up watches. - for peer, h := range c.sawSrc { - c.s.removePeerGoneFromRegionWatcher(peer, h) - } - - // Drain the send queue to count dropped packets - for { - select { - case pkt := <-c.sendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) - case pkt := <-c.discoSendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) - default: - return - } - } - -} - -func (c *sclient) sendLoop(ctx context.Context) error { - defer c.onSendLoopDone() - - jitter := rand.N(5 * time.Second) - keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(keepAlive + jitter) - defer keepAliveTick.Stop() - - var werr error // last write error - inBatch := -1 // for bufferedWriteFrames - for { - if werr != nil { - return werr - } - inBatch++ - // First, a non-blocking select (with a default) that - // does as many non-flushing writes as possible. - select { - case <-ctx.Done(): - return nil - case msg := <-c.peerGone: - werr = c.sendPeerGone(msg.peer, msg.reason) - continue - case <-c.meshUpdate: - werr = c.sendMeshUpdates() - continue - case msg := <-c.sendQueue: - werr = c.sendPacket(msg.src, msg.bs) - c.recordQueueTime(msg.enqueuedAt) - continue - case msg := <-c.discoSendQueue: - werr = c.sendPacket(msg.src, msg.bs) - c.recordQueueTime(msg.enqueuedAt) - continue - case msg := <-c.sendPongCh: - werr = c.sendPong(msg) - continue - case <-keepAliveTickChannel: - werr = c.sendKeepAlive() - continue - default: - // Flush any writes from the 3 sends above, or from - // the blocking loop below. - if werr = c.bw.Flush(); werr != nil { - return werr - } - if inBatch != 0 { // the first loop will almost always hit default & be size zero - c.s.bufferedWriteFrames.Observe(float64(inBatch)) - inBatch = 0 - } - } - - // Then a blocking select with same: - select { - case <-ctx.Done(): - return nil - case msg := <-c.peerGone: - werr = c.sendPeerGone(msg.peer, msg.reason) - case <-c.meshUpdate: - werr = c.sendMeshUpdates() - case msg := <-c.sendQueue: - werr = c.sendPacket(msg.src, msg.bs) - c.recordQueueTime(msg.enqueuedAt) - case msg := <-c.discoSendQueue: - werr = c.sendPacket(msg.src, msg.bs) - c.recordQueueTime(msg.enqueuedAt) - case msg := <-c.sendPongCh: - werr = c.sendPong(msg) - case <-keepAliveTickChannel: - werr = c.sendKeepAlive() - } - } -} - -func (c *sclient) setWriteDeadline() { - d := writeTimeout - if c.canMesh { - // Trusted peers get more tolerance. - // - // The "canMesh" is a bit of a misnomer; mesh peers typically run over a - // different interface for a per-region private VPC and are not - // throttled. But monitoring software elsewhere over the internet also - // use the private mesh key to subscribe to connect/disconnect events - // and might hit throttling and need more time to get the initial dump - // of connected peers. - d = privilegedWriteTimeout - } - c.nc.SetWriteDeadline(time.Now().Add(d)) -} - -// sendKeepAlive sends a keep-alive frame, without flushing. -func (c *sclient) sendKeepAlive() error { - c.setWriteDeadline() - return writeFrameHeader(c.bw.bw(), frameKeepAlive, 0) -} - -// sendPong sends a pong reply, without flushing. -func (c *sclient) sendPong(data [8]byte) error { - c.s.sentPong.Add(1) - c.setWriteDeadline() - if err := writeFrameHeader(c.bw.bw(), framePong, uint32(len(data))); err != nil { - return err - } - _, err := c.bw.Write(data[:]) - return err -} - -const ( - peerGoneFrameLen = keyLen + 1 - peerPresentFrameLen = keyLen + 16 + 2 + 1 // 16 byte IP + 2 byte port + 1 byte flags -) - -// sendPeerGone sends a peerGone frame, without flushing. -func (c *sclient) sendPeerGone(peer key.NodePublic, reason PeerGoneReasonType) error { - switch reason { - case PeerGoneReasonDisconnected: - c.s.peerGoneDisconnectedFrames.Add(1) - case PeerGoneReasonNotHere: - c.s.peerGoneNotHereFrames.Add(1) - } - c.setWriteDeadline() - data := make([]byte, 0, peerGoneFrameLen) - data = peer.AppendTo(data) - data = append(data, byte(reason)) - if err := writeFrameHeader(c.bw.bw(), framePeerGone, uint32(len(data))); err != nil { - return err - } - - _, err := c.bw.Write(data) - return err -} - -// sendPeerPresent sends a peerPresent frame, without flushing. -func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort, flags PeerPresentFlags) error { - c.setWriteDeadline() - if err := writeFrameHeader(c.bw.bw(), framePeerPresent, peerPresentFrameLen); err != nil { - return err - } - payload := make([]byte, peerPresentFrameLen) - _ = peer.AppendTo(payload[:0]) - a16 := ipPort.Addr().As16() - copy(payload[keyLen:], a16[:]) - binary.BigEndian.PutUint16(payload[keyLen+16:], ipPort.Port()) - payload[keyLen+18] = byte(flags) - _, err := c.bw.Write(payload) - return err -} - -// sendMeshUpdates drains all mesh peerStateChange entries into the write buffer -// without flushing. -func (c *sclient) sendMeshUpdates() error { - var lastBatch []peerConnState // memory to best effort reuse - - // takeAll returns c.peerStateChange and empties it. - takeAll := func() []peerConnState { - c.s.mu.Lock() - defer c.s.mu.Unlock() - if len(c.peerStateChange) == 0 { - return nil - } - batch := c.peerStateChange - if cap(lastBatch) > 16 { - lastBatch = nil - } - c.peerStateChange = lastBatch[:0] - return batch - } - - for loops := 0; ; loops++ { - batch := takeAll() - if len(batch) == 0 { - c.s.meshUpdateLoopCount.Observe(float64(loops)) - return nil - } - c.s.meshUpdateBatchSize.Observe(float64(len(batch))) - - for _, pcs := range batch { - var err error - if pcs.present { - err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags) - } else { - err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected) - } - if err != nil { - return err - } - } - lastBatch = batch - } -} - -// sendPacket writes contents to the client in a RecvPacket frame. If -// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses -// DERPv2. The bytes of contents are only valid until this function -// returns, do not retain slices. -// It does not flush its bufio.Writer. -func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) { - defer func() { - // Stats update. - if err != nil { - c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError) - } else { - c.s.packetsSent.Add(1) - c.s.bytesSent.Add(int64(len(contents))) - } - c.debugLogf("sendPacket from %s: %v", srcKey.ShortString(), err) - }() - - c.setWriteDeadline() - - withKey := !srcKey.IsZero() - pktLen := len(contents) - if withKey { - pktLen += key.NodePublicRawLen - c.noteSendFromSrc(srcKey) - } - if err = writeFrameHeader(c.bw.bw(), frameRecvPacket, uint32(pktLen)); err != nil { - return err - } - if withKey { - if err := srcKey.WriteRawWithoutAllocating(c.bw.bw()); err != nil { - return err - } - } - _, err = c.bw.Write(contents) - return err -} - -// noteSendFromSrc notes that we are about to write a packet -// from src to sclient. -// -// It must only be called from the sendLoop goroutine. -func (c *sclient) noteSendFromSrc(src key.NodePublic) { - if _, ok := c.sawSrc[src]; ok { - return - } - h := c.s.addPeerGoneFromRegionWatcher(src, c.onPeerGoneFromRegion) - mak.Set(&c.sawSrc, src, h) -} - -// AddPacketForwarder registers fwd as a packet forwarder for dst. -// fwd must be comparable. -func (s *Server) AddPacketForwarder(dst key.NodePublic, fwd PacketForwarder) { - s.mu.Lock() - defer s.mu.Unlock() - if prev, ok := s.clientsMesh[dst]; ok { - if prev == fwd { - // Duplicate registration of same forwarder. Ignore. - return - } - if m, ok := prev.(*multiForwarder); ok { - if _, ok := m.all[fwd]; ok { - // Duplicate registration of same forwarder in set; ignore. - return - } - m.add(fwd) - return - } - if prev != nil { - // Otherwise, the existing value is not a set, - // not a dup, and not local-only (nil) so make - // it a set. `prev` existed first, so will have higher - // priority. - fwd = newMultiForwarder(prev, fwd) - s.multiForwarderCreated.Add(1) - } - } - s.clientsMesh[dst] = fwd -} - -// RemovePacketForwarder removes fwd as a packet forwarder for dst. -// fwd must be comparable. -func (s *Server) RemovePacketForwarder(dst key.NodePublic, fwd PacketForwarder) { - s.mu.Lock() - defer s.mu.Unlock() - v, ok := s.clientsMesh[dst] - if !ok { - return - } - if m, ok := v.(*multiForwarder); ok { - if len(m.all) < 2 { - panic("unexpected") - } - if remain, isLast := m.deleteLocked(fwd); isLast { - // If fwd was in m and we no longer need to be a - // multiForwarder, replace the entry with the - // remaining PacketForwarder. - s.clientsMesh[dst] = remain - s.multiForwarderDeleted.Add(1) - } - return - } - if v != fwd { - s.removePktForwardOther.Add(1) - // Delete of an entry that wasn't in the - // map. Harmless, so ignore. - // (This might happen if a user is moving around - // between nodes and/or the server sent duplicate - // connection change broadcasts.) - return - } - - if _, isLocal := s.clients[dst]; isLocal { - s.clientsMesh[dst] = nil - } else { - delete(s.clientsMesh, dst) - s.notePeerGoneFromRegionLocked(dst) - } -} - -// multiForwarder is a PacketForwarder that represents a set of -// forwarding options. It's used in the rare cases that a client is -// connected to multiple DERP nodes in a region. That shouldn't really -// happen except for perhaps during brief moments while the client is -// reconfiguring, in which case we don't want to forget where the -// client is. The map value is unique connection number; the lowest -// one has been seen the longest. It's used to make sure we forward -// packets consistently to the same node and don't pick randomly. -type multiForwarder struct { - fwd syncs.AtomicValue[PacketForwarder] // preferred forwarder. - all map[PacketForwarder]uint8 // all forwarders, protected by s.mu. -} - -// newMultiForwarder creates a new multiForwarder. -// The first PacketForwarder passed to this function will be the preferred one. -func newMultiForwarder(fwds ...PacketForwarder) *multiForwarder { - f := &multiForwarder{all: make(map[PacketForwarder]uint8)} - f.fwd.Store(fwds[0]) - for idx, fwd := range fwds { - f.all[fwd] = uint8(idx) - } - return f -} - -// add adds a new forwarder to the map with a connection number that -// is higher than the existing ones. -func (f *multiForwarder) add(fwd PacketForwarder) { - var max uint8 - for _, v := range f.all { - if v > max { - max = v - } - } - f.all[fwd] = max + 1 -} - -// deleteLocked removes a packet forwarder from the map. It expects Server.mu to be held. -// If only one forwarder remains after the removal, it will be returned alongside a `true` boolean value. -func (f *multiForwarder) deleteLocked(fwd PacketForwarder) (_ PacketForwarder, isLast bool) { - delete(f.all, fwd) - - if fwd == f.fwd.Load() { - // The preferred forwarder has been removed, choose a new one - // based on the lowest index. - var lowestfwd PacketForwarder - var lowest uint8 - for k, v := range f.all { - if lowestfwd == nil || v < lowest { - lowestfwd = k - lowest = v - } - } - if lowestfwd != nil { - f.fwd.Store(lowestfwd) - } - } - - if len(f.all) == 1 { - for k := range f.all { - return k, true - } - } - return nil, false -} - -func (f *multiForwarder) ForwardPacket(src, dst key.NodePublic, payload []byte) error { - return f.fwd.Load().ForwardPacket(src, dst, payload) -} - -func (f *multiForwarder) String() string { - return fmt.Sprintf("", f.fwd.Load(), len(f.all)) -} - -func (s *Server) expVarFunc(f func() any) expvar.Func { - return expvar.Func(func() any { - s.mu.Lock() - defer s.mu.Unlock() - return f() - }) -} - -// ExpVar returns an expvar variable suitable for registering with expvar.Publish. -func (s *Server) ExpVar() expvar.Var { - m := new(metrics.Set) - m.Set("gauge_memstats_sys0", expvar.Func(func() any { return int64(s.memSys0) })) - m.Set("gauge_watchers", s.expVarFunc(func() any { return len(s.watchers) })) - m.Set("gauge_current_file_descriptors", expvar.Func(func() any { return metrics.CurrentFDs() })) - m.Set("gauge_current_connections", &s.curClients) - m.Set("gauge_current_home_connections", &s.curHomeClients) - m.Set("gauge_current_notideal_connections", &s.curClientsNotIdeal) - m.Set("gauge_clients_total", expvar.Func(func() any { return len(s.clientsMesh) })) - m.Set("gauge_clients_local", expvar.Func(func() any { return len(s.clients) })) - m.Set("gauge_clients_remote", expvar.Func(func() any { return len(s.clientsMesh) - len(s.clients) })) - m.Set("gauge_current_dup_client_keys", &s.dupClientKeys) - m.Set("gauge_current_dup_client_conns", &s.dupClientConns) - m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal) - m.Set("accepts", &s.accepts) - m.Set("bytes_received", &s.bytesRecv) - m.Set("bytes_sent", &s.bytesSent) - m.Set("packets_dropped", &s.packetsDropped) - m.Set("counter_packets_dropped_reason", &s.packetsDroppedReason) - m.Set("counter_packets_dropped_type", &s.packetsDroppedType) - m.Set("counter_packets_received_kind", &s.packetsRecvByKind) - m.Set("packets_sent", &s.packetsSent) - m.Set("packets_received", &s.packetsRecv) - m.Set("unknown_frames", &s.unknownFrames) - m.Set("home_moves_in", &s.homeMovesIn) - m.Set("home_moves_out", &s.homeMovesOut) - m.Set("got_ping", &s.gotPing) - m.Set("sent_pong", &s.sentPong) - m.Set("peer_gone_disconnected_frames", &s.peerGoneDisconnectedFrames) - m.Set("peer_gone_not_here_frames", &s.peerGoneNotHereFrames) - m.Set("packets_forwarded_out", &s.packetsForwardedOut) - m.Set("packets_forwarded_in", &s.packetsForwardedIn) - m.Set("multiforwarder_created", &s.multiForwarderCreated) - m.Set("multiforwarder_deleted", &s.multiForwarderDeleted) - m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther) - m.Set("sclient_write_timeouts", &s.sclientWriteTimeouts) - m.Set("average_queue_duration_ms", expvar.Func(func() any { - return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) - })) - m.Set("counter_tcp_rtt", &s.tcpRtt) - m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize) - m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount) - m.Set("counter_buffered_write_frames", s.bufferedWriteFrames) - var expvarVersion expvar.String - expvarVersion.Set(version.Long()) - m.Set("version", &expvarVersion) - return m -} - -func (s *Server) ConsistencyCheck() error { - s.mu.Lock() - defer s.mu.Unlock() - - var errs []string - - var nilMeshNotInClient int - for k, f := range s.clientsMesh { - if f == nil { - if _, ok := s.clients[k]; !ok { - nilMeshNotInClient++ - } - } - } - if nilMeshNotInClient != 0 { - errs = append(errs, fmt.Sprintf("%d s.clientsMesh keys not in s.clients", nilMeshNotInClient)) - } - - var clientNotInMesh int - for k := range s.clients { - if _, ok := s.clientsMesh[k]; !ok { - clientNotInMesh++ - } - } - if clientNotInMesh != 0 { - errs = append(errs, fmt.Sprintf("%d s.clients keys not in s.clientsMesh", clientNotInMesh)) - } - - if s.curClients.Value() != int64(len(s.clients)) { - errs = append(errs, fmt.Sprintf("expvar connections = %d != clients map says of %d", - s.curClients.Value(), - len(s.clients))) - } - - if s.verifyClientsLocalTailscaled { - errs = append(errs, "lanscaping") - } - - if len(errs) == 0 { - return nil - } - return errors.New(strings.Join(errs, ", ")) -} - -const minTimeBetweenLogs = 2 * time.Second - -// BytesSentRecv records the number of bytes that have been sent since the last traffic check -// for a given process, as well as the public key of the process sending those bytes. -type BytesSentRecv struct { - Sent uint64 - Recv uint64 - // Key is the public key of the client which sent/received these bytes. - Key key.NodePublic -} - -// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic. -// Separated out for ease of testing. -func parseSSOutput(raw string) map[netip.AddrPort]BytesSentRecv { - newState := map[netip.AddrPort]BytesSentRecv{} - // parse every 2 lines and get src and dst ips, and kv pairs - lines := strings.Split(raw, "\n") - for i := 0; i < len(lines); i += 2 { - ipInfo := strings.Fields(strings.TrimSpace(lines[i])) - if len(ipInfo) < 5 { - continue - } - src, err := netip.ParseAddrPort(ipInfo[4]) - if err != nil { - continue - } - stats := strings.Fields(strings.TrimSpace(lines[i+1])) - stat := BytesSentRecv{} - for _, s := range stats { - if strings.Contains(s, "bytes_sent") { - sent, err := strconv.Atoi(s[strings.Index(s, ":")+1:]) - if err == nil { - stat.Sent = uint64(sent) - } - } else if strings.Contains(s, "bytes_received") { - recv, err := strconv.Atoi(s[strings.Index(s, ":")+1:]) - if err == nil { - stat.Recv = uint64(recv) - } - } - } - newState[src] = stat - } - return newState -} - -func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) { - prevState := map[netip.AddrPort]BytesSentRecv{} - enc := json.NewEncoder(w) - for r.Context().Err() == nil { - output, err := exec.Command("ss", "-i", "-H", "-t").Output() - if err != nil { - fmt.Fprintf(w, "ss failed: %v", err) - return - } - newState := parseSSOutput(string(output)) - s.mu.Lock() - for k, next := range newState { - prev := prevState[k] - if prev.Sent < next.Sent || prev.Recv < next.Recv { - if pkey, ok := s.keyOfAddr[k]; ok { - next.Key = pkey - if err := enc.Encode(next); err != nil { - s.mu.Unlock() - return - } - } - } - } - s.mu.Unlock() - prevState = newState - if _, err := fmt.Fprintln(w); err != nil { - return - } - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - time.Sleep(minTimeBetweenLogs) - } -} - -var bufioWriterPool = &sync.Pool{ - New: func() any { - return bufio.NewWriterSize(io.Discard, 2<<10) - }, -} - -// lazyBufioWriter is a bufio.Writer-like wrapping writer that lazily -// allocates its actual bufio.Writer from a sync.Pool, releasing it to -// the pool upon flush. -// -// We do this to reduce memory overhead; most DERP connections are -// idle and the idle bufio.Writers were 30% of overall memory usage. -type lazyBufioWriter struct { - w io.Writer // underlying - lbw *bufio.Writer // lazy; nil means it needs an associated buffer -} - -func (w *lazyBufioWriter) bw() *bufio.Writer { - if w.lbw == nil { - w.lbw = bufioWriterPool.Get().(*bufio.Writer) - w.lbw.Reset(w.w) - } - return w.lbw -} - -func (w *lazyBufioWriter) Available() int { return w.bw().Available() } - -func (w *lazyBufioWriter) Write(p []byte) (int, error) { return w.bw().Write(p) } - -func (w *lazyBufioWriter) Flush() error { - if w.lbw == nil { - return nil - } - err := w.lbw.Flush() - - w.lbw.Reset(io.Discard) - bufioWriterPool.Put(w.lbw) - w.lbw = nil - - return err -} diff --git a/derp/derp_server_default.go b/derp/derp_server_default.go deleted file mode 100644 index 3e0b5b5e9..000000000 --- a/derp/derp_server_default.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) Tailscale Inc & AUTHORS -// SPDX-License-Identifier: BSD-3-Clause - -//go:build !linux - -package derp - -import "context" - -func (c *sclient) startStatsLoop(ctx context.Context) { - // Nothing to do - return -} diff --git a/derp/derp_server_linux.go b/derp/derp_server_linux.go deleted file mode 100644 index df25c35fe..000000000 --- a/derp/derp_server_linux.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) Tailscale Inc & AUTHORS -// SPDX-License-Identifier: BSD-3-Clause - -package derp - -import ( - "context" - "crypto/tls" - "net" - "time" -) - -func (c *sclient) startStatsLoop(ctx context.Context) { -} - -// tcpConn attempts to get the underlying *net.TCPConn from this client's -// Conn; if it cannot, then it will return nil. -func (c *sclient) tcpConn() *net.TCPConn { - nc := c.nc - for { - switch v := nc.(type) { - case *net.TCPConn: - return v - case *tls.Conn: - nc = v.NetConn() - default: - return nil - } - } -} - -func durationToLabel(dur time.Duration) string { - switch { - case dur <= 10*time.Millisecond: - return "10ms" - case dur <= 20*time.Millisecond: - return "20ms" - case dur <= 50*time.Millisecond: - return "50ms" - case dur <= 100*time.Millisecond: - return "100ms" - case dur <= 150*time.Millisecond: - return "150ms" - case dur <= 250*time.Millisecond: - return "250ms" - case dur <= 500*time.Millisecond: - return "500ms" - default: - return "inf" - } -} diff --git a/derp/derphttp/derphttp_client.go b/derp/derphttp/derphttp_client.go index ef023071c..8cec72f2d 100644 --- a/derp/derphttp/derphttp_client.go +++ b/derp/derphttp/derphttp_client.go @@ -497,7 +497,8 @@ func (c *Client) connect(ctx context.Context, caller string) (client *derp.Clien req.Header.Set("Connection", "Upgrade") if !idealNodeInRegion && reg != nil { // This is purely informative for now (2024-07-06) for stats: - req.Header.Set(derp.IdealNodeHeader, reg.Nodes[0].Name) + const IdealNodeHeader = "Ideal-Node" + req.Header.Set(IdealNodeHeader, reg.Nodes[0].Name) // TODO(bradfitz,raggi): start a time.AfterFunc for 30m-1h or so to // dialNode(reg.Nodes[0]) and see if we can even TCP connect to it. If // so, TLS handshake it as well (which is mixed up in this massive diff --git a/derp/derphttp/derphttp_server.go b/derp/derphttp/derphttp_server.go index ed7d3d707..c838e3e1d 100644 --- a/derp/derphttp/derphttp_server.go +++ b/derp/derphttp/derphttp_server.go @@ -4,12 +4,8 @@ package derphttp import ( - "fmt" - "log" "net/http" "strings" - - "tailscale.com/derp" ) // fastStartHeader is the header (with value "1") that signals to the HTTP @@ -18,64 +14,6 @@ import ( // following its HTTP request. const fastStartHeader = "Derp-Fast-Start" -// Handler returns an http.Handler to be mounted at /derp, serving s. -func Handler(s *derp.Server) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - // These are installed both here and in cmd/derper. The check here - // catches both cmd/derper run with DERP disabled (STUN only mode) as - // well as DERP being run in tests with derphttp.Handler directly, - // as netcheck still assumes this replies. - switch r.URL.Path { - case "/derp/probe", "/derp/latency-check": - ProbeHandler(w, r) - return - } - - up := strings.ToLower(r.Header.Get("Upgrade")) - if up != "websocket" && up != "derp" { - if up != "" { - log.Printf("Weird upgrade: %q", up) - } - http.Error(w, "DERP requires connection upgrade", http.StatusUpgradeRequired) - return - } - - fastStart := r.Header.Get(fastStartHeader) == "1" - - h, ok := w.(http.Hijacker) - if !ok { - http.Error(w, "HTTP does not support general TCP support", 500) - return - } - - netConn, conn, err := h.Hijack() - if err != nil { - log.Printf("Hijack failed: %v", err) - http.Error(w, "HTTP does not support general TCP support", 500) - return - } - - if !fastStart { - pubKey := s.PublicKey() - fmt.Fprintf(conn, "HTTP/1.1 101 Switching Protocols\r\n"+ - "Upgrade: DERP\r\n"+ - "Connection: Upgrade\r\n"+ - "Derp-Version: %v\r\n"+ - "Derp-Public-Key: %s\r\n\r\n", - derp.ProtocolVersion, - pubKey.UntypedHexString()) - } - - if v := r.Header.Get(derp.IdealNodeHeader); v != "" { - ctx = derp.IdealNodeContextKey.WithValue(ctx, v) - } - - s.Accept(ctx, netConn, conn, netConn.RemoteAddr().String()) - }) -} - // ProbeHandler is the endpoint that clients without UDP access (including js/wasm) hit to measure // DERP latency, as a replacement for UDP STUN queries. func ProbeHandler(w http.ResponseWriter, r *http.Request) { diff --git a/derp/dropreason_string.go b/derp/dropreason_string.go deleted file mode 100644 index 3ad072819..000000000 --- a/derp/dropreason_string.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) Tailscale Inc & AUTHORS -// SPDX-License-Identifier: BSD-3-Clause - -// Code generated by "stringer -type=dropReason -trimprefix=dropReason"; DO NOT EDIT. - -package derp - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[dropReasonUnknownDest-0] - _ = x[dropReasonUnknownDestOnFwd-1] - _ = x[dropReasonGoneDisconnected-2] - _ = x[dropReasonQueueHead-3] - _ = x[dropReasonQueueTail-4] - _ = x[dropReasonWriteError-5] - _ = x[dropReasonDupClient-6] - _ = x[numDropReasons-7] -} - -const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneDisconnectedQueueHeadQueueTailWriteErrorDupClientnumDropReasons" - -var _dropReason_index = [...]uint8{0, 11, 27, 43, 52, 61, 71, 80, 94} - -func (i dropReason) String() string { - if i < 0 || i >= dropReason(len(_dropReason_index)-1) { - return "dropReason(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _dropReason_name[_dropReason_index[i]:_dropReason_index[i+1]] -} diff --git a/health/health.go b/health/health.go index fe7a86433..0109ebc2d 100644 --- a/health/health.go +++ b/health/health.go @@ -8,7 +8,6 @@ package health import ( "context" "errors" - "expvar" "fmt" "maps" "net/http" @@ -19,13 +18,11 @@ import ( "time" "tailscale.com/envknob" - "tailscale.com/metrics" "tailscale.com/tailcfg" "tailscale.com/types/opt" "tailscale.com/util/mak" "tailscale.com/util/multierr" "tailscale.com/util/set" - "tailscale.com/util/usermetric" "tailscale.com/version" ) @@ -110,7 +107,6 @@ type Tracker struct { lastLoginErr error localLogConfigErr error tlsConnectionErrors map[string]error // map[ServerName]error - metricHealthMessage *metrics.MultiLabelMap[metricHealthMessageLabel] } // Subsystem is the name of a subsystem whose health can be monitored. @@ -309,33 +305,6 @@ func (w *Warnable) IsVisible(ws *warningState) bool { return time.Since(ws.BrokenSince) >= w.TimeToVisible } -// SetMetricsRegistry sets up the metrics for the Tracker. It takes -// a usermetric.Registry and registers the metrics there. -func (t *Tracker) SetMetricsRegistry(reg *usermetric.Registry) { - if reg == nil || t.metricHealthMessage != nil { - return - } - - t.metricHealthMessage = usermetric.NewMultiLabelMapWithRegistry[metricHealthMessageLabel]( - reg, - "tailscaled_health_messages", - "gauge", - "Number of health messages broken down by type.", - ) - - t.metricHealthMessage.Set(metricHealthMessageLabel{ - Type: MetricLabelWarning, - }, expvar.Func(func() any { - if t.nil() { - return 0 - } - t.mu.Lock() - defer t.mu.Unlock() - t.updateBuiltinWarnablesLocked() - return int64(len(t.stringsLocked())) - })) -} - // SetUnhealthy sets a warningState for the given Warnable with the provided Args, and should be // called when a Warnable becomes unhealthy, or its unhealthy status needs to be updated. // SetUnhealthy takes ownership of args. The args can be nil if no additional information is diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 83a4be3ea..b6811b2cf 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -78,7 +78,6 @@ import ( "tailscale.com/util/syspolicy" "tailscale.com/util/testenv" "tailscale.com/util/uniq" - "tailscale.com/util/usermetric" "tailscale.com/version" "tailscale.com/version/distro" "tailscale.com/wgengine" @@ -154,7 +153,6 @@ type LocalBackend struct { statsLogf logger.Logf // for printing peers stats on change sys *tsd.System health *health.Tracker // always non-nil - metrics metrics e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys store ipn.StateStore // non-nil; TODO(bradfitz): remove; use sys dialer *tsdial.Dialer // non-nil; TODO(bradfitz): remove; use sys @@ -334,11 +332,6 @@ func (b *LocalBackend) HealthTracker() *health.Tracker { return b.health } -// UserMetricsRegistry returns the usermetrics registry for the backend -func (b *LocalBackend) UserMetricsRegistry() *usermetric.Registry { - return b.sys.UserMetricsRegistry() -} - // NetMon returns the network monitor for the backend. func (b *LocalBackend) NetMon() *netmon.Monitor { return b.sys.NetMon.Get() @@ -348,16 +341,6 @@ type updateStatus struct { started bool } -type metrics struct { - // advertisedRoutes is a metric that reports the number of network routes that are advertised by the local node. - // This informs the user of how many routes are being advertised by the local node, excluding exit routes. - advertisedRoutes *usermetric.Gauge - - // approvedRoutes is a metric that reports the number of network routes served by the local node and approved - // by the control server. - approvedRoutes *usermetric.Gauge -} - // clientGen is a func that creates a control plane client. // It's the type used by LocalBackend.SetControlClientGetterForTesting. type clientGen func(controlclient.Options) (controlclient.Client, error) @@ -400,13 +383,6 @@ func NewLocalBackend(logf logger.Logf, sys *tsd.System, loginFlags controlclient captiveCtx, captiveCancel := context.WithCancel(ctx) captiveCancel() - m := metrics{ - advertisedRoutes: sys.UserMetricsRegistry().NewGauge( - "tailscaled_advertised_routes", "Number of advertised network routes (e.g. by a subnet router)"), - approvedRoutes: sys.UserMetricsRegistry().NewGauge( - "tailscaled_approved_routes", "Number of approved network routes (e.g. by a subnet router)"), - } - b := &LocalBackend{ ctx: ctx, ctxCancel: cancel, @@ -415,7 +391,6 @@ func NewLocalBackend(logf logger.Logf, sys *tsd.System, loginFlags controlclient statsLogf: logger.LogOnChange(logf, 5*time.Minute, clock.Now), sys: sys, health: sys.HealthTracker(), - metrics: m, e: e, dialer: dialer, store: store, @@ -3659,8 +3634,6 @@ func (b *LocalBackend) applyPrefsToHostinfoLocked(hi *tailcfg.Hostinfo, prefs ip hi.ShieldsUp = prefs.ShieldsUp() hi.AllowsUpdate = envknob.AllowsRemoteUpdate() || prefs.AutoUpdate().Apply.EqualBool(true) - b.metrics.advertisedRoutes.Set(float64(tsaddr.WithoutExitRoute(prefs.AdvertiseRoutes()).Len())) - hi.ServicesHash = b.vipServiceHash(b.vipServicesFromPrefsLocked(prefs)) // The Hostinfo.WantIngress field tells control whether this node wants to @@ -4161,10 +4134,6 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) { if nm == nil { b.nodeByAddr = nil - - // If there is no netmap, the client is going into a "turned off" - // state so reset the metrics. - b.metrics.approvedRoutes.Set(0) return } @@ -4192,7 +4161,6 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) { approved++ } } - b.metrics.approvedRoutes.Set(approved) } for _, p := range nm.Peers { addNode(p) diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index 07374a55a..b1b0b07b0 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -23,7 +23,6 @@ import ( "github.com/tailscale/wireguard-go/tun" "go4.org/mem" "tailscale.com/disco" - tsmetrics "tailscale.com/metrics" "tailscale.com/net/packet" "tailscale.com/net/packet/checksum" "tailscale.com/net/tsaddr" @@ -33,7 +32,6 @@ import ( "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/util/clientmetric" - "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" "tailscale.com/wgengine/wgcfg" ) @@ -183,20 +181,6 @@ type Wrapper struct { // disableTSMPRejected disables TSMP rejected responses. For tests. disableTSMPRejected bool - - metrics *metrics -} - -type metrics struct { - inboundDroppedPacketsTotal *tsmetrics.MultiLabelMap[usermetric.DropLabels] - outboundDroppedPacketsTotal *tsmetrics.MultiLabelMap[usermetric.DropLabels] -} - -func registerMetrics(reg *usermetric.Registry) *metrics { - return &metrics{ - inboundDroppedPacketsTotal: reg.DroppedPacketsInbound(), - outboundDroppedPacketsTotal: reg.DroppedPacketsOutbound(), - } } // tunInjectedRead is an injected packet pretending to be a tun.Read(). @@ -227,15 +211,15 @@ func (w *Wrapper) Start() { close(w.startCh) } -func WrapTAP(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper { - return wrap(logf, tdev, true, m) +func WrapTAP(logf logger.Logf, tdev tun.Device) *Wrapper { + return wrap(logf, tdev, true) } -func Wrap(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper { - return wrap(logf, tdev, false, m) +func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper { + return wrap(logf, tdev, false) } -func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) *Wrapper { +func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper { logf = logger.WithPrefix(logf, "tstun: ") w := &Wrapper{ disableFilter: true, // lanscaping @@ -254,7 +238,6 @@ func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) // TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets. filterFlags: filter.LogAccepts | filter.LogDrops, startCh: make(chan struct{}), - metrics: registerMetrics(m), } w.vectorBuffer = make([][]byte, tdev.BatchSize()) @@ -883,9 +866,6 @@ func (t *Wrapper) Write(buffs [][]byte, offset int) (int, error) { t.noteActivity() _, err := t.tdevWrite(buffs, offset) if err != nil { - t.metrics.inboundDroppedPacketsTotal.Add(usermetric.DropLabels{ - Reason: usermetric.ReasonError, - }, int64(len(buffs))) } return len(buffs), err } diff --git a/tsd/tsd.go b/tsd/tsd.go index ef93e2d05..f163f83b3 100644 --- a/tsd/tsd.go +++ b/tsd/tsd.go @@ -29,7 +29,6 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/net/tstun" "tailscale.com/types/netmap" - "tailscale.com/util/usermetric" "tailscale.com/wgengine" "tailscale.com/wgengine/magicsock" "tailscale.com/wgengine/router" @@ -59,8 +58,7 @@ type System struct { controlKnobs controlknobs.Knobs - healthTracker health.Tracker - userMetricsRegistry usermetric.Registry + healthTracker health.Tracker } // NetstackImpl is the interface that *netstack.Impl implements. @@ -126,11 +124,6 @@ func (s *System) HealthTracker() *health.Tracker { return &s.healthTracker } -// UserMetricsRegistry returns the system usermetrics. -func (s *System) UserMetricsRegistry() *usermetric.Registry { - return &s.userMetricsRegistry -} - // SubSystem represents some subsystem of the Tailscale node daemon. // // A subsystem can be set to a value, and then later retrieved. A subsystem diff --git a/util/clientmetric/clientmetric.go b/util/clientmetric/clientmetric.go index 584a24f73..b2d356b60 100644 --- a/util/clientmetric/clientmetric.go +++ b/util/clientmetric/clientmetric.go @@ -9,7 +9,6 @@ import ( "bytes" "encoding/binary" "encoding/hex" - "expvar" "fmt" "io" "sort" @@ -17,8 +16,6 @@ import ( "sync" "sync/atomic" "time" - - "tailscale.com/util/set" ) var ( @@ -226,54 +223,6 @@ func NewGaugeFunc(name string, f func() int64) *Metric { return m } -// AggregateCounter returns a sum of expvar counters registered with it. -type AggregateCounter struct { - mu sync.RWMutex - counters set.Set[*expvar.Int] -} - -func (c *AggregateCounter) Value() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - var sum int64 - for cnt := range c.counters { - sum += cnt.Value() - } - return sum -} - -// Register registers provided expvar counter. -// When a counter is added to the counter, it will be reset -// to start counting from 0. This is to avoid incrementing the -// counter with an unexpectedly large value. -func (c *AggregateCounter) Register(counter *expvar.Int) { - c.mu.Lock() - defer c.mu.Unlock() - // No need to do anything if it's already registered. - if c.counters.Contains(counter) { - return - } - counter.Set(0) - c.counters.Add(counter) -} - -// UnregisterAll unregisters all counters resulting in it -// starting back down at zero. This is to ensure monotonicity -// and respect the semantics of the counter. -func (c *AggregateCounter) UnregisterAll() { - c.mu.Lock() - defer c.mu.Unlock() - c.counters = set.Set[*expvar.Int]{} -} - -// NewAggregateCounter returns a new aggregate counter that returns -// a sum of expvar variables registered with it. -func NewAggregateCounter(name string) *AggregateCounter { - c := &AggregateCounter{counters: set.Set[*expvar.Int]{}} - NewGaugeFunc(name, c.Value) - return c -} - // WritePrometheusExpositionFormat writes all client metrics to w in // the Prometheus text-based exposition format. // diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index e49b8da55..bd024931b 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -672,12 +672,6 @@ func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan if err != nil { c.logf("magicsock: derp.Send(%v): %v", wr.addr, err) metricSendDERPError.Add(1) - if !wr.isDisco { - c.metrics.outboundPacketsDroppedErrors.Add(1) - } - } else if !wr.isDisco { - c.metrics.outboundPacketsDERPTotal.Add(1) - c.metrics.outboundBytesDERPTotal.Add(int64(len(wr.b))) } } } @@ -735,8 +729,6 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en ep.noteRecvActivity(ipp, mono.Now()) - c.metrics.inboundPacketsDERPTotal.Add(1) - c.metrics.inboundBytesDERPTotal.Add(int64(n)) return n, ep } diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 4d0278f9a..2db079bbc 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -951,15 +951,6 @@ func (de *endpoint) send(buffs [][]byte) error { for _, b := range buffs { txBytes += len(b) } - - switch { - case udpAddr.Addr().Is4(): - de.c.metrics.outboundPacketsIPv4Total.Add(int64(len(buffs))) - de.c.metrics.outboundBytesIPv4Total.Add(int64(txBytes)) - case udpAddr.Addr().Is6(): - de.c.metrics.outboundPacketsIPv6Total.Add(int64(len(buffs))) - de.c.metrics.outboundBytesIPv6Total.Add(int64(txBytes)) - } } if derpAddr.IsValid() { allOk := true diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 26d155528..d694d150a 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -10,7 +10,6 @@ import ( "bytes" "context" "errors" - "expvar" "fmt" "io" "net" @@ -54,7 +53,6 @@ import ( "tailscale.com/util/set" "tailscale.com/util/testenv" "tailscale.com/util/uniq" - "tailscale.com/util/usermetric" "tailscale.com/wgengine/capture" "tailscale.com/wgengine/wgint" ) @@ -91,41 +89,6 @@ type pathLabel struct { Path Path } -// metrics in wgengine contains the usermetrics counters for magicsock, it -// is however a bit special. All them metrics are labeled, but looking up -// the metric everytime we need to record it has an overhead, and includes -// a lock in MultiLabelMap. The metrics are therefore instead created with -// wgengine and the underlying expvar.Int is stored to be used directly. -type metrics struct { - // inboundPacketsTotal is the total number of inbound packets received, - // labeled by the path the packet took. - inboundPacketsIPv4Total expvar.Int - inboundPacketsIPv6Total expvar.Int - inboundPacketsDERPTotal expvar.Int - - // inboundBytesTotal is the total number of inbound bytes received, - // labeled by the path the packet took. - inboundBytesIPv4Total expvar.Int - inboundBytesIPv6Total expvar.Int - inboundBytesDERPTotal expvar.Int - - // outboundPacketsTotal is the total number of outbound packets sent, - // labeled by the path the packet took. - outboundPacketsIPv4Total expvar.Int - outboundPacketsIPv6Total expvar.Int - outboundPacketsDERPTotal expvar.Int - - // outboundBytesTotal is the total number of outbound bytes sent, - // labeled by the path the packet took. - outboundBytesIPv4Total expvar.Int - outboundBytesIPv6Total expvar.Int - outboundBytesDERPTotal expvar.Int - - // outboundPacketsDroppedErrors is the total number of outbound packets - // dropped due to errors. - outboundPacketsDroppedErrors expvar.Int -} - // A Conn routes UDP packets and actively manages a list of its endpoints. type Conn struct { // This block mirrors the contents and field order of the Options @@ -354,9 +317,6 @@ type Conn struct { // responsibility to ensure that traffic from these endpoints is routed // to the node. staticEndpoints views.Slice[netip.AddrPort] - - // metrics contains the metrics for the magicsock instance. - metrics *metrics } // SetDebugLoggingEnabled controls whether spammy debug logging is enabled. @@ -423,9 +383,6 @@ type Options struct { // report errors and warnings to. HealthTracker *health.Tracker - // Metrics specifies the metrics registry to record metrics to. - Metrics *usermetric.Registry - // ControlKnobs are the set of control knobs to use. // If nil, they're ignored and not updated. ControlKnobs *controlknobs.Knobs @@ -531,86 +488,10 @@ func NewConn(opts Options) (*Conn, error) { SkipExternalNetwork: inTest(), } - c.metrics = registerMetrics(opts.Metrics) - c.logf("magicsock: disco key = %v", c.discoShort) return c, nil } -// registerMetrics wires up the metrics for wgengine, instead of -// registering the label metric directly, the underlying expvar is exposed. -// See metrics for more info. -func registerMetrics(reg *usermetric.Registry) *metrics { - pathDirectV4 := pathLabel{Path: PathDirectIPv4} - pathDirectV6 := pathLabel{Path: PathDirectIPv6} - pathDERP := pathLabel{Path: PathDERP} - inboundPacketsTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( - reg, - "tailscaled_inbound_packets_total", - "counter", - "Counts the number of packets received from other peers", - ) - inboundBytesTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( - reg, - "tailscaled_inbound_bytes_total", - "counter", - "Counts the number of bytes received from other peers", - ) - outboundPacketsTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( - reg, - "tailscaled_outbound_packets_total", - "counter", - "Counts the number of packets sent to other peers", - ) - outboundBytesTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( - reg, - "tailscaled_outbound_bytes_total", - "counter", - "Counts the number of bytes sent to other peers", - ) - outboundPacketsDroppedErrors := reg.DroppedPacketsOutbound() - - m := new(metrics) - - // Map clientmetrics to the usermetric counters. - metricRecvDataPacketsIPv4.Register(&m.inboundPacketsIPv4Total) - metricRecvDataPacketsIPv6.Register(&m.inboundPacketsIPv6Total) - metricRecvDataPacketsDERP.Register(&m.inboundPacketsDERPTotal) - metricSendUDP.Register(&m.outboundPacketsIPv4Total) - metricSendUDP.Register(&m.outboundPacketsIPv6Total) - metricSendDERP.Register(&m.outboundPacketsDERPTotal) - - inboundPacketsTotal.Set(pathDirectV4, &m.inboundPacketsIPv4Total) - inboundPacketsTotal.Set(pathDirectV6, &m.inboundPacketsIPv6Total) - inboundPacketsTotal.Set(pathDERP, &m.inboundPacketsDERPTotal) - - inboundBytesTotal.Set(pathDirectV4, &m.inboundBytesIPv4Total) - inboundBytesTotal.Set(pathDirectV6, &m.inboundBytesIPv6Total) - inboundBytesTotal.Set(pathDERP, &m.inboundBytesDERPTotal) - - outboundPacketsTotal.Set(pathDirectV4, &m.outboundPacketsIPv4Total) - outboundPacketsTotal.Set(pathDirectV6, &m.outboundPacketsIPv6Total) - outboundPacketsTotal.Set(pathDERP, &m.outboundPacketsDERPTotal) - - outboundBytesTotal.Set(pathDirectV4, &m.outboundBytesIPv4Total) - outboundBytesTotal.Set(pathDirectV6, &m.outboundBytesIPv6Total) - outboundBytesTotal.Set(pathDERP, &m.outboundBytesDERPTotal) - - outboundPacketsDroppedErrors.Set(usermetric.DropLabels{Reason: usermetric.ReasonError}, &m.outboundPacketsDroppedErrors) - - return m -} - -// deregisterMetrics unregisters the underlying usermetrics expvar counters -// from clientmetrics. -func deregisterMetrics(m *metrics) { - metricRecvDataPacketsIPv4.UnregisterAll() - metricRecvDataPacketsIPv6.UnregisterAll() - metricRecvDataPacketsDERP.UnregisterAll() - metricSendUDP.UnregisterAll() - metricSendDERP.UnregisterAll() -} - // doPeriodicSTUN is called (in a new goroutine) by // periodicReSTUNTimer when periodic STUNs are active. func (c *Conn) doPeriodicSTUN() { c.ReSTUN("periodic") } @@ -1108,11 +989,6 @@ func (c *Conn) networkDown() bool { return !c.networkUp.Load() } // See https://pkg.go.dev/golang.zx2c4.com/wireguard/conn#Bind.Send func (c *Conn) Send(buffs [][]byte, ep conn.Endpoint) (err error) { n := int64(len(buffs)) - defer func() { - if err != nil { - c.metrics.outboundPacketsDroppedErrors.Add(n) - } - }() metricSendData.Add(n) if c.networkDown() { metricSendDataNetworkDown.Add(n) @@ -1171,17 +1047,6 @@ func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte, isDisco bool) (sent bool, e if err != nil { metricSendUDPError.Add(1) c.maybeRebindOnError(err) - } else { - if sent && !isDisco { - switch { - case ipp.Addr().Is4(): - c.metrics.outboundPacketsIPv4Total.Add(1) - c.metrics.outboundBytesIPv4Total.Add(int64(len(b))) - case ipp.Addr().Is6(): - c.metrics.outboundPacketsIPv6Total.Add(1) - c.metrics.outboundBytesIPv6Total.Add(int64(len(b))) - } - } } return } @@ -1308,23 +1173,17 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) { } func (c *Conn) receiveIPv4() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), - &c.metrics.inboundPacketsIPv4Total, - &c.metrics.inboundBytesIPv4Total, - ) + return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4)) } // receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6. func (c *Conn) receiveIPv6() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), - &c.metrics.inboundPacketsIPv6Total, - &c.metrics.inboundBytesIPv6Total, - ) + return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6)) } // mkReceiveFunc creates a ReceiveFunc reading from ruc. // The provided healthItem and metrics are updated if non-nil. -func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetric, bytesMetric *expvar.Int) conn.ReceiveFunc { +func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats) conn.ReceiveFunc { // epCache caches an IPPort->endpoint for hot flows. var epCache ippEndpointCache @@ -1361,12 +1220,6 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu } ipp := msg.Addr.(*net.UDPAddr).AddrPort() if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok { - if packetMetric != nil { - packetMetric.Add(1) - } - if bytesMetric != nil { - bytesMetric.Add(int64(msg.N)) - } eps[i] = ep sizes[i] = msg.N reportToCaller = true @@ -2368,8 +2221,6 @@ func (c *Conn) Close() error { c.muCond.Wait() } - deregisterMetrics(c.metrics) - return nil } @@ -2911,17 +2762,12 @@ var ( metricSendDERPErrorChan = clientmetric.NewCounter("magicsock_send_derp_error_chan") metricSendDERPErrorClosed = clientmetric.NewCounter("magicsock_send_derp_error_closed") metricSendDERPErrorQueue = clientmetric.NewCounter("magicsock_send_derp_error_queue") - metricSendUDP = clientmetric.NewAggregateCounter("magicsock_send_udp") metricSendUDPError = clientmetric.NewCounter("magicsock_send_udp_error") - metricSendDERP = clientmetric.NewAggregateCounter("magicsock_send_derp") metricSendDERPError = clientmetric.NewCounter("magicsock_send_derp_error") // Data packets (non-disco) metricSendData = clientmetric.NewCounter("magicsock_send_data") metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down") - metricRecvDataPacketsDERP = clientmetric.NewAggregateCounter("magicsock_recv_data_derp") - metricRecvDataPacketsIPv4 = clientmetric.NewAggregateCounter("magicsock_recv_data_ipv4") - metricRecvDataPacketsIPv6 = clientmetric.NewAggregateCounter("magicsock_recv_data_ipv6") // Disco packets metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp") diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 384eaeacf..cc37f5b0c 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -42,7 +42,6 @@ import ( "tailscale.com/util/mak" "tailscale.com/util/set" "tailscale.com/util/testenv" - "tailscale.com/util/usermetric" "tailscale.com/version" "tailscale.com/wgengine/filter" "tailscale.com/wgengine/magicsock" @@ -173,10 +172,6 @@ type Config struct { // HealthTracker, if non-nil, is the health tracker to use. HealthTracker *health.Tracker - // Metrics is the usermetrics registry to use. - // Mandatory, if not set, an error is returned. - Metrics *usermetric.Registry - // Dialer is the dialer to use for outbound connections. // If nil, a new Dialer is created. Dialer *tsdial.Dialer @@ -227,8 +222,6 @@ func NewFakeUserspaceEngine(logf logger.Logf, opts ...any) (Engine, error) { conf.ControlKnobs = v case *health.Tracker: conf.HealthTracker = v - case *usermetric.Registry: - conf.Metrics = v default: return nil, fmt.Errorf("unknown option type %T", v) } @@ -247,10 +240,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) panic("NewUserspaceEngine called without HealthTracker (being strict in tests)") } - if conf.Metrics == nil { - return nil, errors.New("NewUserspaceEngine: opts.Metrics is required, please pass a *usermetric.Registry") - } - if conf.Tun == nil { logf("[v1] using fake (no-op) tun device") conf.Tun = tstun.NewFake() @@ -265,9 +254,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) var tsTUNDev *tstun.Wrapper if conf.IsTAP { - tsTUNDev = tstun.WrapTAP(logf, conf.Tun, conf.Metrics) + tsTUNDev = tstun.WrapTAP(logf, conf.Tun) } else { - tsTUNDev = tstun.Wrap(logf, conf.Tun, conf.Metrics) + tsTUNDev = tstun.Wrap(logf, conf.Tun) } closePool.add(tsTUNDev) @@ -357,7 +346,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) NoteRecvActivity: e.noteRecvActivity, NetMon: e.netMon, HealthTracker: e.health, - Metrics: conf.Metrics, ControlKnobs: conf.ControlKnobs, OnPortUpdate: onPortUpdate, PeerByKeyFunc: e.PeerByKey,