tailscale/ipn/ipnlocal/bus_test.go
Brad Fitzpatrick a6c5d23742 ipn, ipn/ipnlocal: add Notify.SelfChange
Add a new bus signal that lets reactive consumers (containerboot, kube
agents, sniproxy, tsconsensus, etc.) react to self-node updates without
having to subscribe to the full netmap. Today those consumers either
watch Notify.NetMap (which on large tailnets is expensive to encode and
ship per watcher) or poll. SelfChange is a cheap, narrow alternative:
addresses, name, key expiry, capabilities, etc.

Consumers that need additional state can react to SelfChange and then
fetch the relevant bits on demand via existing LocalClient methods.

Producer-side, every netmap-bearing setControlClientStatus call now
also publishes SelfChange. Future changes will migrate individual
in-tree consumers off Notify.NetMap to this signal, and eventually
gate the legacy NetMap emission to platforms whose host GUIs still
require it.

Updates #12542

Change-Id: I4441650b0e085d663eb6bf26a03748b7d961ca49
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2026-04-30 14:47:03 -07:00

323 lines
8.1 KiB
Go

// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package ipnlocal
import (
"context"
"reflect"
"slices"
"testing"
"time"
"tailscale.com/drive"
"tailscale.com/ipn"
"tailscale.com/tailcfg"
"tailscale.com/tstest"
"tailscale.com/tstime"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
"tailscale.com/types/views"
)
func TestIsNotableNotify(t *testing.T) {
tests := []struct {
name string
notify *ipn.Notify
want bool
}{
{"nil", nil, false},
{"empty", &ipn.Notify{}, false},
{"version", &ipn.Notify{Version: "foo"}, false},
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
{"peerchanges", &ipn.Notify{PeerChanges: []*tailcfg.PeerChange{{}}}, false},
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
{"selfchange", &ipn.Notify{SelfChange: &tailcfg.Node{}}, true},
}
// Then for all other fields, assume they're notable.
// We use reflect to catch fields that might be added in the future without
// remembering to update the [isNotableNotify] function.
rt := reflect.TypeFor[ipn.Notify]()
for sf := range rt.Fields() {
n := &ipn.Notify{}
switch sf.Name {
case "_", "NetMap", "PeerChanges", "SelfChange", "Engine", "Version":
// Already covered above or not applicable.
continue
case "DriveShares":
n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1))
default:
rf := reflect.ValueOf(n).Elem().FieldByIndex(sf.Index)
switch rf.Kind() {
case reflect.Pointer:
rf.Set(reflect.New(rf.Type().Elem()))
case reflect.String:
rf.SetString("foo")
case reflect.Slice:
rf.Set(reflect.MakeSlice(rf.Type(), 1, 1))
default:
t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name)
}
}
tests = append(tests, struct {
name string
notify *ipn.Notify
want bool
}{
name: "field-" + sf.Name,
notify: n,
want: true,
})
}
for _, tt := range tests {
if got := isNotableNotify(tt.notify); got != tt.want {
t.Errorf("%v: got %v; want %v", tt.name, got, tt.want)
}
}
}
type rateLimitingBusSenderTester struct {
tb testing.TB
got []*ipn.Notify
clock *tstest.Clock
s *rateLimitingBusSender
}
func (st *rateLimitingBusSenderTester) init() {
if st.s != nil {
return
}
st.clock = tstest.NewClock(tstest.ClockOpts{
Start: time.Unix(1731777537, 0), // time I wrote this test :)
})
st.s = &rateLimitingBusSender{
clock: tstime.DefaultClock{Clock: st.clock},
fn: func(n *ipn.Notify) bool {
st.got = append(st.got, n)
return true
},
}
}
func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) {
st.tb.Helper()
st.init()
if !st.s.send(n) {
st.tb.Fatal("unexpected send failed")
}
}
func (st *rateLimitingBusSenderTester) advance(d time.Duration) {
st.tb.Helper()
st.clock.Advance(d)
select {
case <-st.s.flushChan():
if !st.s.flush() {
st.tb.Fatal("unexpected flush failed")
}
default:
}
}
func TestRateLimitingBusSender(t *testing.T) {
nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
t.Run("unbuffered", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.send(nm1)
st.send(nm2)
st.send(eng1)
st.send(eng2)
if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) {
t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got))
}
})
t.Run("buffered", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.init()
st.s.interval = 1 * time.Second
st.send(&ipn.Notify{Version: "initial"})
if len(st.got) != 1 {
t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got))
}
st.send(nm1)
st.send(nm2)
st.send(eng1)
st.send(eng2)
if len(st.got) != 1 {
if len(st.got) != 1 {
t.Fatalf("got %d items; expected still just that first 1", len(st.got))
}
}
// But moving the clock should flush the rest, collasced into one new one.
st.advance(5 * time.Second)
if len(st.got) != 2 {
t.Fatalf("got %d items; want 2", len(st.got))
}
gotn := st.got[1]
if gotn.NetMap != nm2.NetMap {
t.Errorf("got wrong NetMap; got %p", gotn.NetMap)
}
if gotn.Engine != eng2.Engine {
t.Errorf("got wrong Engine; got %p", gotn.Engine)
}
if t.Failed() {
t.Logf("failed Notify was: %v", logger.AsJSON(gotn))
}
})
// Test the Run method
t.Run("run", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.init()
st.s.interval = 1 * time.Second
st.s.lastFlush = st.clock.Now() // pretend we just flushed
flushc := make(chan *ipn.Notify, 1)
st.s.fn = func(n *ipn.Notify) bool {
flushc <- n
return true
}
didSend := make(chan bool, 2)
st.s.didSendTestHook = func() { didSend <- true }
waitSend := func() {
select {
case <-didSend:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for call to send")
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
incoming := make(chan *ipn.Notify, 2)
go func() {
incoming <- nm1
waitSend()
incoming <- nm2
waitSend()
st.advance(5 * time.Second)
select {
case n := <-flushc:
if n.NetMap != nm2.NetMap {
t.Errorf("got wrong NetMap; got %p", n.NetMap)
}
case <-time.After(10 * time.Second):
t.Error("timeout")
}
cancel()
}()
st.s.Run(ctx, incoming)
})
}
func TestMergePeerChanges(t *testing.T) {
online := true
offline := false
t.Run("no_overlap_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 2, DERPRegion: 2},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[1].NodeID != 2 {
t.Errorf("got NodeIDs %d, %d; want 1, 2", got[0].NodeID, got[1].NodeID)
}
})
t.Run("overlap_merges", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online},
{NodeID: 2, DERPRegion: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 5, Online: &offline},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2 (merged, not appended)", len(got))
}
if got[0].DERPRegion != 5 {
t.Errorf("DERPRegion = %d; want 5 (from new)", got[0].DERPRegion)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
// Node 2 should be untouched.
if got[1].NodeID != 2 || got[1].DERPRegion != 10 {
t.Errorf("node 2 was modified unexpectedly")
}
})
t.Run("partial_overlap_merges_and_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 2},
{NodeID: 3, DERPRegion: 30},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[0].DERPRegion != 2 {
t.Errorf("node 1: DERPRegion = %d; want 2", got[0].DERPRegion)
}
if got[1].NodeID != 3 || got[1].DERPRegion != 30 {
t.Errorf("node 3: DERPRegion = %d; want 30", got[1].DERPRegion)
}
})
t.Run("preserves_old_fields_on_merge", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online, Cap: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, Online: &offline},
}
got := mergePeerChanges(old, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].DERPRegion != 1 {
t.Errorf("DERPRegion = %d; want 1 (preserved from old)", got[0].DERPRegion)
}
if got[0].Cap != 10 {
t.Errorf("Cap = %d; want 10 (preserved from old)", got[0].Cap)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
})
t.Run("nil_old", func(t *testing.T) {
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
got := mergePeerChanges(nil, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].NodeID != 1 {
t.Errorf("NodeID = %d; want 1", got[0].NodeID)
}
})
}