From ad5436af0d57f0473a2555d47e712afe4d360fbe Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 26 Apr 2026 22:48:05 +0000 Subject: [PATCH] tstest/largetailnet, tstest/integration/testcontrol: add in-process large-tailnet benchmark Add a Go benchmark that exercises a single tailnet client (a [tsnet.Server] running in the test process) against a synthetic large initial netmap and a stream of caller-driven peer add/remove deltas, all in-process. The harness is split in two parts: - tstest/largetailnet, a reusable package containing a [Streamer] that hijacks the map long-poll on a [testcontrol.Server] via the new AltMapStream hook, sends one initial MapResponse with N synthetic peers, and forwards caller-supplied delta MapResponses on the same stream. Helpers like MakePeer / AllocPeer build synthetic peers with unique IDs and addresses derived from the Tailscale ULA range. - tstest/largetailnet/largetailnet_test.go, BenchmarkGiantTailnet (headless tailscaled workload, no IPN bus subscriber) and BenchmarkGiantTailnetBusWatcher (GUI-client workload with one Notify subscriber attached). Both are gated on --actually-test-giant-tailnet (skipped by default), stand up an in-process testcontrol + tsnet.Server, let Up block until the initial N-peer netmap has been processed, then ResetTimer and run add+remove pairs via b.Loop. Per-delta sync is via a test-only [ipnlocal.LocalBackend.AwaitNodeKeyForTest] channel that closes once the just-added peer key appears in the netmap (no-watcher variant) or via bus-Notify drain (bus-watcher variant). To support the hijack, [testcontrol.Server] grows an AltMapStream hook and a small MapStreamWriter interface for benchmarks/stress tests that need to drive a controlled MapResponse sequence; the normal serveMap path is untouched when AltMapStream is nil. The streamer answers non-streaming "lite" map polls (which controlclient issues before the streaming long-poll to push HostInfo) with an empty MapResponse and returns immediately, so the streaming poll that follows is the one that gets the initial netmap. The benchmark is intended for before/after comparisons of netmap- and delta-handling changes targeted at large tailnets. CPU profiles on unmodified main show the expected O(N) hotspots: setControlClientStatusLocked / authReconfigLocked / userspaceEngine.Reconfig / setNetMapLocked, plus JSON encoding of the full Notify.NetMap to bus watchers (which dominates the BusWatcher variant). Median ms/op over 10 runs on unmodified main, by tailnet size N: N no-watcher bus-watcher 10000 32 166 50000 222 865 100000 504 1765 250000 1551 4696 Recommended invocation: go test ./tstest/largetailnet/ -run=^$ \ -bench='BenchmarkGiantTailnet(BusWatcher)?$' \ -benchtime=2000x -timeout=10m \ --actually-test-giant-tailnet \ --giant-tailnet-n=250000 \ -cpuprofile=/tmp/giant.cpu.pprof Updates #12542 Change-Id: I4f5b2bb271a36ba853d5a0ffe82054ef2b15c585 Signed-off-by: Brad Fitzpatrick --- ipn/ipnlocal/local.go | 8 + ipn/ipnlocal/node_backend.go | 45 +++ tsnet/tsnet.go | 13 + tstest/integration/testcontrol/testcontrol.go | 53 +++- tstest/largetailnet/largetailnet.go | 265 ++++++++++++++++++ tstest/largetailnet/largetailnet_test.go | 218 ++++++++++++++ 6 files changed, 601 insertions(+), 1 deletion(-) create mode 100644 tstest/largetailnet/largetailnet.go create mode 100644 tstest/largetailnet/largetailnet_test.go diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 654a78e24..31354aba6 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -2471,6 +2471,14 @@ func (b *LocalBackend) PeersForTest() []tailcfg.NodeView { return b.currentNode().PeersForTest() } +// AwaitNodeKeyForTest returns a channel that is closed once a peer with the +// given node key first appears in the current netmap. If the peer is already +// present, the returned channel is already closed. See +// [nodeBackend.AwaitNodeKeyForTest]. +func (b *LocalBackend) AwaitNodeKeyForTest(k key.NodePublic) <-chan struct{} { + return b.currentNode().AwaitNodeKeyForTest(k) +} + func (b *LocalBackend) getNewControlClientFuncLocked() clientGen { if b.ccGen == nil { // Initialize it rather than just returning the diff --git a/ipn/ipnlocal/node_backend.go b/ipn/ipnlocal/node_backend.go index 6c5db0e0d..b0e84ae7c 100644 --- a/ipn/ipnlocal/node_backend.go +++ b/ipn/ipnlocal/node_backend.go @@ -29,6 +29,7 @@ import ( "tailscale.com/util/eventbus" "tailscale.com/util/mak" "tailscale.com/util/slicesx" + "tailscale.com/util/testenv" "tailscale.com/wgengine/filter" ) @@ -107,6 +108,12 @@ type nodeBackend struct { // nodeByKey is an index of node public key to node ID for fast lookups. // It is mutated in place (with mu held) and must not escape the [nodeBackend]. nodeByKey map[key.NodePublic]tailcfg.NodeID + + // keyWaitersForTest is the test-only registry of channels waiting for + // a given peer key to first appear in the netmap. See + // [nodeBackend.AwaitNodeKeyForTest]. It is populated lazily and remains + // nil in production, where no test installs a waiter. + keyWaitersForTest map[key.NodePublic]chan struct{} } func newNodeBackend(ctx context.Context, logf logger.Logf, bus *eventbus.Bus) *nodeBackend { @@ -421,6 +428,7 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) { nb.updateNodeByAddrLocked() nb.updateNodeByKeyLocked() nb.updatePeersLocked() + nb.signalKeyWaitersForTestLocked() if nm != nil { nb.derpMapViewPub.Publish(nm.DERPMap.View()) } else { @@ -428,6 +436,43 @@ func (nb *nodeBackend) SetNetMap(nm *netmap.NetworkMap) { } } +// AwaitNodeKeyForTest returns a channel that is closed once a peer with the +// given node key first appears in this nodeBackend's peer index, or +// immediately (a closed channel) if it's already present. It is intended for +// in-process benchmarks that drive synthetic netmap deltas and need a +// zero-overhead signal that the client has applied a delta, replacing +// poll-based [local.Client.WhoIsNodeKey] loops in tests. It panics outside +// of tests. +func (nb *nodeBackend) AwaitNodeKeyForTest(k key.NodePublic) <-chan struct{} { + testenv.AssertInTest() + nb.mu.Lock() + defer nb.mu.Unlock() + if _, ok := nb.nodeByKey[k]; ok { + return syncs.ClosedChan() + } + if ch, ok := nb.keyWaitersForTest[k]; ok { + return ch + } + ch := make(chan struct{}) + mak.Set(&nb.keyWaitersForTest, k, ch) + return ch +} + +// signalKeyWaitersForTestLocked closes any waiter channels whose keys now +// appear in nb.nodeByKey. It is cheap when there are no waiters, which is +// the common case in production. It is called from [nodeBackend.SetNetMap] +// after the per-key index has been rebuilt. +// +// Caller must hold nb.mu. +func (nb *nodeBackend) signalKeyWaitersForTestLocked() { + for k, ch := range nb.keyWaitersForTest { + if _, ok := nb.nodeByKey[k]; ok { + close(ch) + delete(nb.keyWaitersForTest, k) + } + } +} + func (nb *nodeBackend) updateNodeByAddrLocked() { nm := nb.netMap if nm == nil { diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index cc03cdbb6..4d6318018 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -403,6 +403,19 @@ func (s *Server) LocalClient() (*local.Client, error) { return s.localClient, nil } +// TestHooks are hooks meant for internal-testing only; they're not stable +// or documented, intentionally. +var TestHooks testHooks + +type testHooks struct{} + +// LocalBackend returns the [ipnlocal.LocalBackend] backing s. It panics +// outside of tests. +func (testHooks) LocalBackend(s *Server) *ipnlocal.LocalBackend { + testenv.AssertInTest() + return s.lb +} + // Loopback starts a routing server on a loopback address. // // The server has multiple functions. diff --git a/tstest/integration/testcontrol/testcontrol.go b/tstest/integration/testcontrol/testcontrol.go index f16fc89b5..97a193c51 100644 --- a/tstest/integration/testcontrol/testcontrol.go +++ b/tstest/integration/testcontrol/testcontrol.go @@ -89,6 +89,9 @@ type Server struct { // MapResponse stream to modify the first MapResponse sent in response to it. ModifyFirstMapResponse func(*tailcfg.MapResponse, *tailcfg.MapRequest) + // AltMapStream, if non-nil, takes over serveMap. See [AltMapStreamFunc]. + AltMapStream AltMapStreamFunc + initMuxOnce sync.Once mux *http.ServeMux @@ -1144,6 +1147,15 @@ func (s *Server) serveMap(w http.ResponseWriter, r *http.Request, mkey key.Machi go panic(fmt.Sprintf("bad map request: %v", err)) } + if s.AltMapStream != nil { + // The caller takes over the stream entirely; it must handle + // keeping the HTTP response alive until ctx is done. + compress := req.Compress != "" + w.WriteHeader(200) + s.AltMapStream(ctx, &mapStreamSender{s: s, w: w, compress: compress}, req) + return + } + jitter := rand.N(8 * time.Second) keepAlive := 50*time.Second + jitter @@ -1486,12 +1498,51 @@ func (s *Server) takeRawMapMessage(nk key.NodePublic) (mapResJSON []byte, ok boo return mapResJSON, true } +// AltMapStreamFunc is the type of [Server.AltMapStream]: a callback that +// takes over the serveMap handler entirely. The callback hand-builds and +// sends MapResponses via the provided [MapStreamWriter] and is responsible +// for keeping the stream alive until ctx is done. When set, the normal +// per-node map-stream state machine in serveMap is bypassed. +// +// The callback is invoked for every map long-poll, including the +// non-streaming "lite" polls controlclient issues to push HostInfo updates +// (req.Stream == false). Implementations that only care about the streaming +// long-poll typically respond to non-streaming polls with an empty +// MapResponse and return immediately. +// +// This hook is for benchmarks and stress tests that need to drive clients +// with a controlled sequence of responses. +type AltMapStreamFunc func(ctx context.Context, w MapStreamWriter, req *tailcfg.MapRequest) + +// MapStreamWriter is the interface passed to an [AltMapStreamFunc], +// letting the callback write framed MapResponse messages directly onto the +// long-poll HTTP response. +type MapStreamWriter interface { + // SendMapMessage encodes and writes msg as a single framed + // MapResponse on the stream. It respects the client's Compress flag + // (captured when the stream started). + SendMapMessage(msg *tailcfg.MapResponse) error +} + +// mapStreamSender implements [MapStreamWriter] for [Server.AltMapStream] +// callbacks. +type mapStreamSender struct { + s *Server + w http.ResponseWriter + compress bool +} + +func (m *mapStreamSender) SendMapMessage(msg *tailcfg.MapResponse) error { + return m.s.sendMapMsg(m.w, m.compress, msg) +} + func (s *Server) sendMapMsg(w http.ResponseWriter, compress bool, msg any) error { resBytes, err := s.encode(compress, msg) if err != nil { return err } - if len(resBytes) > 16<<20 { + const maxMapSize = 256 << 20 // 256MB + if len(resBytes) > maxMapSize { return fmt.Errorf("map message too big: %d", len(resBytes)) } var siz [4]byte diff --git a/tstest/largetailnet/largetailnet.go b/tstest/largetailnet/largetailnet.go new file mode 100644 index 000000000..73ec2da80 --- /dev/null +++ b/tstest/largetailnet/largetailnet.go @@ -0,0 +1,265 @@ +// Copyright (c) Tailscale Inc & contributors +// SPDX-License-Identifier: BSD-3-Clause + +// Package largetailnet provides reusable building blocks for in-process +// benchmarks and stress tests that drive a single tailnet client (typically a +// [tsnet.Server]) with a synthetic large-tailnet MapResponse stream. +// +// A [Streamer] takes over the map long-poll on a [testcontrol.Server] via the +// AltMapStream hook: it sends one initial MapResponse announcing the self +// node and N synthetic peers, and then forwards caller-supplied delta +// MapResponses on the same stream until ctx is done. +// +// The package is designed so that a benchmark can: +// +// - Build a [Streamer] with the desired peer count. +// - Stand up a [testcontrol.Server] with the streamer's [Streamer.AltMapStream] +// installed. +// - Stand up a [tsnet.Server] pointed at the testcontrol; its Up call +// blocks until the initial netmap has been processed. +// - Reset the benchmark timer and drive add/remove deltas with +// [Streamer.SendDelta] and [Streamer.AllocPeer]. +package largetailnet + +import ( + "context" + cryptorand "crypto/rand" + "fmt" + "net/netip" + "sync/atomic" + "time" + + "go4.org/mem" + "tailscale.com/net/tsaddr" + "tailscale.com/tailcfg" + "tailscale.com/tstest/integration/testcontrol" + "tailscale.com/types/key" +) + +// SelfUserID is the synthetic [tailcfg.UserID] assigned to the self node and +// to every initial peer produced by [Streamer]. Tests that build their own +// peers via [MakePeer] should pass this value. +const SelfUserID tailcfg.UserID = 1_000_000 + +// Streamer drives a controlled MapResponse stream to a single client via +// [testcontrol.Server.AltMapStream]. It synthesizes an initial netmap with N +// peers and forwards caller-supplied delta MapResponses on the same stream. +// +// A Streamer is single-shot: it expects exactly one map long-poll over its +// lifetime and is not safe for re-use across multiple clients. +type Streamer struct { + n int + derpMap *tailcfg.DERPMap + + started chan struct{} // closed when the alt-map-stream callback first fires + initialDone chan struct{} // closed after initial MapResponse has been written + deltas chan *tailcfg.MapResponse + + // nextID is the next free node ID. It starts at N+2 (1 is the self + // node, 2..N+1 are the initial peers) and is bumped by AllocPeer. + nextID atomic.Int64 +} + +// New constructs a Streamer that will produce an initial netmap with n peers +// and a self node when its AltMapStream callback first fires. derpMap is +// included verbatim in the initial MapResponse. +func New(n int, derpMap *tailcfg.DERPMap) *Streamer { + s := &Streamer{ + n: n, + derpMap: derpMap, + started: make(chan struct{}), + initialDone: make(chan struct{}), + // Buffered so a benchmark loop body that does send-then-wait + // doesn't block on the channel under steady state. + deltas: make(chan *tailcfg.MapResponse, 64), + } + s.nextID.Store(int64(n) + 2) + return s +} + +// AltMapStream returns a callback suitable for [testcontrol.Server.AltMapStream]. +// On the first streaming long-poll it sends the initial big MapResponse and +// then forwards deltas enqueued via [Streamer.SendDelta] until ctx is done. +// Non-streaming "lite" polls are answered with an empty MapResponse so they +// complete quickly. The streamer is single-shot: any later streaming polls +// are kept alive but produce no further messages. +func (s *Streamer) AltMapStream() testcontrol.AltMapStreamFunc { + return func(ctx context.Context, w testcontrol.MapStreamWriter, req *tailcfg.MapRequest) { + if !req.Stream { + _ = w.SendMapMessage(&tailcfg.MapResponse{}) + return + } + + select { + case <-s.started: + // Re-poll after the original stream ended. Keep the + // connection alive so the client doesn't churn. + <-ctx.Done() + return + default: + close(s.started) + } + + if err := s.sendInitial(w, req); err != nil { + // Make the failure loud rather than wedging the + // caller's [tsnet.Server.Up] on a silent retry loop. + panic(fmt.Sprintf("largetailnet: sendInitial: %v", err)) + } + close(s.initialDone) + + for { + select { + case <-ctx.Done(): + return + case mr := <-s.deltas: + if err := w.SendMapMessage(mr); err != nil { + <-ctx.Done() + return + } + } + } + } +} + +// AwaitInitialSent blocks until the initial big MapResponse has been written +// to the wire. Note this is not the same as "the client has finished +// processing it"; for that, callers should rely on [tsnet.Server.Up] +// returning, or watch the IPN bus. +func (s *Streamer) AwaitInitialSent(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.initialDone: + return nil + } +} + +// SendDelta enqueues mr for delivery on the active MapResponse stream. It +// blocks if the internal queue is full or the stream hasn't started yet. +func (s *Streamer) SendDelta(ctx context.Context, mr *tailcfg.MapResponse) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.deltas <- mr: + return nil + } +} + +// AllocPeer returns a fresh synthetic peer node with a never-before-used +// [tailcfg.NodeID]. It's intended for use in PeersChanged deltas. +func (s *Streamer) AllocPeer() *tailcfg.Node { + return MakePeer(tailcfg.NodeID(s.nextID.Add(1)-1), SelfUserID) +} + +// SelfNodeID returns the [tailcfg.NodeID] used for the self node in the +// initial netmap. +func (s *Streamer) SelfNodeID() tailcfg.NodeID { return 1 } + +// sendInitial writes the big initial MapResponse with s.n peers. +func (s *Streamer) sendInitial(w testcontrol.MapStreamWriter, req *tailcfg.MapRequest) error { + selfNodeID := s.SelfNodeID() + selfIP4 := node4(selfNodeID) + selfIP6 := node6(selfNodeID) + + peers := make([]*tailcfg.Node, 0, s.n) + for i := 0; i < s.n; i++ { + peers = append(peers, MakePeer(tailcfg.NodeID(i+2), SelfUserID)) + } + + now := time.Now().UTC() + selfNode := &tailcfg.Node{ + ID: selfNodeID, + StableID: "largetailnet-self", + Name: "self.largetailnet.ts.net.", + User: SelfUserID, + Key: req.NodeKey, + KeyExpiry: now.Add(24 * time.Hour), + Machine: randMachineKey(), // fake; client doesn't verify + DiscoKey: req.DiscoKey, + MachineAuthorized: true, + Addresses: []netip.Prefix{selfIP4, selfIP6}, + AllowedIPs: []netip.Prefix{selfIP4, selfIP6}, + CapMap: map[tailcfg.NodeCapability][]tailcfg.RawMessage{}, + } + + initial := &tailcfg.MapResponse{ + KeepAlive: false, + Node: selfNode, + DERPMap: s.derpMap, + Peers: peers, + PacketFilter: []tailcfg.FilterRule{{ + // Accept-all filter so the client isn't logging packet-filter + // failures; this is a benchmark harness, not a security test. + SrcIPs: []string{"*"}, + DstPorts: []tailcfg.NetPortRange{{IP: "*", Ports: tailcfg.PortRangeAny}}, + }}, + DNSConfig: &tailcfg.DNSConfig{}, + Domain: "largetailnet.ts.net", + UserProfiles: []tailcfg.UserProfile{{ + ID: SelfUserID, + LoginName: "largetailnet@example.com", + DisplayName: "largetailnet", + }}, + ControlTime: &now, + } + return w.SendMapMessage(initial) +} + +// MakePeer constructs a synthetic [tailcfg.Node] for the given NodeID and +// UserID. The peer's node/disco/machine keys are derived from random bytes +// via the *PublicFromRaw32 constructors rather than via key.New*().Public(), +// which avoids the per-peer Curve25519 ScalarBaseMult and lets the harness +// construct hundreds of thousands of peers in a few hundred milliseconds. +// The client never crypto-validates these keys in the bench, so opaque +// random bytes are sufficient. +func MakePeer(nid tailcfg.NodeID, user tailcfg.UserID) *tailcfg.Node { + v4, v6 := node4(nid), node6(nid) + name := fmt.Sprintf("peer-%d", nid) + return &tailcfg.Node{ + ID: nid, + StableID: tailcfg.StableNodeID(name), + Name: name + ".largetailnet.ts.net.", + Key: randNodeKey(), + MachineAuthorized: true, + DiscoKey: randDiscoKey(), + Machine: randMachineKey(), + Addresses: []netip.Prefix{v4, v6}, + AllowedIPs: []netip.Prefix{v4, v6}, + User: user, + // Hostinfo must be non-nil: LocalBackend.populatePeerStatus + // dereferences it via HostinfoView.Hostname unconditionally. + Hostinfo: (&tailcfg.Hostinfo{Hostname: name}).View(), + } +} + +func randNodeKey() key.NodePublic { + var b [32]byte + cryptorand.Read(b[:]) + return key.NodePublicFromRaw32(mem.B(b[:])) +} + +func randDiscoKey() key.DiscoPublic { + var b [32]byte + cryptorand.Read(b[:]) + return key.DiscoPublicFromRaw32(mem.B(b[:])) +} + +func randMachineKey() key.MachinePublic { + var b [32]byte + cryptorand.Read(b[:]) + return key.MachinePublicFromRaw32(mem.B(b[:])) +} + +func node4(nid tailcfg.NodeID) netip.Prefix { + return netip.PrefixFrom( + netip.AddrFrom4([4]byte{100, 100 + byte(nid>>16), byte(nid >> 8), byte(nid)}), + 32) +} + +func node6(nid tailcfg.NodeID) netip.Prefix { + a := tsaddr.TailscaleULARange().Addr().As16() + a[13] = byte(nid >> 16) + a[14] = byte(nid >> 8) + a[15] = byte(nid) + return netip.PrefixFrom(netip.AddrFrom16(a), 128) +} diff --git a/tstest/largetailnet/largetailnet_test.go b/tstest/largetailnet/largetailnet_test.go new file mode 100644 index 000000000..07f67df82 --- /dev/null +++ b/tstest/largetailnet/largetailnet_test.go @@ -0,0 +1,218 @@ +// Copyright (c) Tailscale Inc & contributors +// SPDX-License-Identifier: BSD-3-Clause + +package largetailnet_test + +import ( + "context" + "flag" + "net/http/httptest" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "tailscale.com/ipn/store/mem" + "tailscale.com/tailcfg" + "tailscale.com/tsnet" + "tailscale.com/tstest/integration" + "tailscale.com/tstest/integration/testcontrol" + "tailscale.com/tstest/largetailnet" + "tailscale.com/types/logger" +) + +// tsnet.Server.Up handles the wait-for-ipn.Running step itself: it +// subscribes to the IPN bus with NotifyInitialState and blocks until State +// reaches ipn.Running, which by definition means a netmap has been applied. +// We don't redo that work here. + +var ( + flagActuallyTest = flag.Bool("actually-test-giant-tailnet", false, + "if set, run the BenchmarkGiantTailnet* benchmarks; otherwise they are skipped") + flagN = flag.Int("giant-tailnet-n", 250_000, + "size of the initial netmap (peer count) for BenchmarkGiantTailnet*") + flagBenchVerbose = flag.Bool("giant-tailnet-verbose", false, + "if set, log tsnet output and DERP setup to stderr") +) + +// BenchmarkGiantTailnet measures the per-delta CPU cost of a tailnet client +// processing peer-add/peer-remove deltas in steady state, with no IPN bus +// subscribers attached. This represents the headless-tailscaled workload +// (Linux subnet routers, container sidecars, ...) where the LocalBackend +// does not pay for fanning Notify.NetMap out to GUI watchers. +// +// Use [BenchmarkGiantTailnetBusWatcher] for the GUI-client workload. +// +// The benchmark is opt-in via --actually-test-giant-tailnet. +func BenchmarkGiantTailnet(b *testing.B) { + if !*flagActuallyTest { + b.Skip("set --actually-test-giant-tailnet to run this benchmark") + } + benchGiantTailnet(b, false) +} + +// BenchmarkGiantTailnetBusWatcher is like [BenchmarkGiantTailnet] but +// attaches one [local.Client.WatchIPNBus] subscriber for the duration of the +// benchmark. The Notify-fan-out cost (notably Notify.NetMap encoding to +// every watcher on every full-rebuild path) is therefore included in the +// per-delta measurement, which approximates the GUI-client workload. +// +// The benchmark is opt-in via --actually-test-giant-tailnet. +func BenchmarkGiantTailnetBusWatcher(b *testing.B) { + if !*flagActuallyTest { + b.Skip("set --actually-test-giant-tailnet to run this benchmark") + } + benchGiantTailnet(b, true) +} + +// benchGiantTailnet is the shared body of the BenchmarkGiantTailnet* +// benchmarks. Setup is entirely in-process: a [testcontrol.Server] hosts +// the control plane, a [tsnet.Server] hosts the client, and a +// [largetailnet.Streamer] hijacks the map long-poll to drive an exact +// MapResponse sequence. +// +// Each loop iteration sends one [tailcfg.MapResponse] with PeersChanged +// (a fresh peer) and PeersRemoved (the previous fresh peer), then waits +// for the client to apply it. Net peer count stays at flagN throughout the +// loop. +// +// The wait mechanism differs by variant: +// +// - busWatcher=false: block on a channel returned by +// [ipnlocal.LocalBackend.AwaitNodeKeyForTest] (reached via +// [tsnet.TestHooks]). The channel is closed by LocalBackend the moment +// the just-added peer's key appears in the netmap, so the wait has zero +// polling overhead. +// - busWatcher=true: drain Notify events from the bus subscription, since +// a Notify firing is exactly the side-effect we want to amortize into +// the per-delta measurement. +// +// Recommended invocation for profiling on unmodified main: +// +// go test ./tstest/largetailnet/ -run=^$ \ +// -bench='BenchmarkGiantTailnet(BusWatcher)?$' \ +// -benchtime=2000x -timeout=10m \ +// --actually-test-giant-tailnet \ +// --giant-tailnet-n=250000 \ +// -cpuprofile=/tmp/giant.cpu.pprof +func benchGiantTailnet(b *testing.B, busWatcher bool) { + logf := logger.Discard + if *flagBenchVerbose { + logf = b.Logf + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + b.Cleanup(cancel) + + derpMap := integration.RunDERPAndSTUN(b, logf, "127.0.0.1") + + streamer := largetailnet.New(*flagN, derpMap) + + ctrl := &testcontrol.Server{ + DERPMap: derpMap, + DNSConfig: &tailcfg.DNSConfig{}, + AltMapStream: streamer.AltMapStream(), + Logf: logf, + } + ctrl.HTTPTestServer = httptest.NewUnstartedServer(ctrl) + ctrl.HTTPTestServer.Start() + b.Cleanup(ctrl.HTTPTestServer.Close) + controlURL := ctrl.HTTPTestServer.URL + b.Logf("testcontrol listening on %s", controlURL) + + tmp := filepath.Join(b.TempDir(), "tsnet") + if err := os.MkdirAll(tmp, 0755); err != nil { + b.Fatal(err) + } + + s := &tsnet.Server{ + Dir: tmp, + ControlURL: controlURL, + Hostname: "largetailnet-bench", + Store: new(mem.Store), + Ephemeral: true, + Logf: logf, + } + b.Cleanup(func() { s.Close() }) + + // tsnet.Server.Up blocks until the backend reaches Running, which + // requires the initial flagN-peer MapResponse to have been processed. + upStart := time.Now() + if _, err := s.Up(ctx); err != nil { + b.Fatalf("tsnet.Server.Up: %v", err) + } + b.Logf("initial %d-peer netmap processed in %v", *flagN, time.Since(upStart)) + + lc, err := s.LocalClient() + if err != nil { + b.Fatalf("LocalClient: %v", err) + } + lb := tsnet.TestHooks.LocalBackend(s) + + var notifyCh chan struct{} + if busWatcher { + bw, err := lc.WatchIPNBus(ctx, 0) + if err != nil { + b.Fatalf("WatchIPNBus: %v", err) + } + b.Cleanup(func() { bw.Close() }) + notifyCh = make(chan struct{}, 1024) + go func() { + for { + n, err := bw.Next() + if err != nil { + return + } + if n.NetMap != nil || len(n.PeerChanges) > 0 { + select { + case notifyCh <- struct{}{}: + default: + } + } + } + }() + } + + var prevAdded *tailcfg.Node + runtime.GC() + + b.ResetTimer() + for b.Loop() { + added := streamer.AllocPeer() + mr := &tailcfg.MapResponse{ + PeersChanged: []*tailcfg.Node{added}, + } + if prevAdded != nil { + mr.PeersRemoved = []tailcfg.NodeID{prevAdded.ID} + } + prevAdded = added + + if err := streamer.SendDelta(ctx, mr); err != nil { + b.Fatalf("SendDelta: %v", err) + } + + if busWatcher { + // A Notify firing is itself part of the workload we + // want to measure on this variant. + select { + case <-notifyCh: + case <-time.After(10 * time.Second): + b.Fatal("timed out waiting for notify") + case <-ctx.Done(): + b.Fatalf("ctx done waiting for notify: %v", ctx.Err()) + } + } else { + // Block on the LocalBackend's test-only signal that + // the just-added peer key has landed in the netmap. + // No polling, no notify fan-out cost. + select { + case <-lb.AwaitNodeKeyForTest(added.Key): + case <-time.After(10 * time.Second): + b.Fatalf("timed out waiting for node key %v", added.Key) + case <-ctx.Done(): + b.Fatalf("ctx done waiting for node key: %v", ctx.Err()) + } + } + } +}