tailscale/ipn/ipnlocal/bus_test.go
Jonathan Nobels 03c3551ee5
ipn/ipnlocal: add netmap mutations to the ipn bus (#19120)
ipn/local: add netmap mutations to the ipn bus

updates tailscale/tailscale#1909

This adds a new new NotifyWatchOpt that allows watchers to
receive PeerChange events (derived from node mutations)
on the IPN bus in lieu of a complete netmap.  We'll continue
to send the full netmap for any map response that includes it,
but for  mutations, sending PeerChange events gives the client
the option to manage it's own models more selectively and cuts
way down on json serialization overhead.

On chatty tailnets, this will vastly reduce the amount of
chatter on the bus.

This change should be backwards compatible, it is
purely additive.  Clients that subscribe to NotifyNetmap will
get the full netmap for every delta.  New clients can
omit that and instead opt into NotifyPeerChanges.

Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
2026-04-09 15:45:41 -04:00

322 lines
8.0 KiB
Go

// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package ipnlocal
import (
"context"
"reflect"
"slices"
"testing"
"time"
"tailscale.com/drive"
"tailscale.com/ipn"
"tailscale.com/tailcfg"
"tailscale.com/tstest"
"tailscale.com/tstime"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
"tailscale.com/types/views"
)
func TestIsNotableNotify(t *testing.T) {
tests := []struct {
name string
notify *ipn.Notify
want bool
}{
{"nil", nil, false},
{"empty", &ipn.Notify{}, false},
{"version", &ipn.Notify{Version: "foo"}, false},
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
{"peerchanges", &ipn.Notify{PeerChanges: []*tailcfg.PeerChange{{}}}, false},
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
}
// Then for all other fields, assume they're notable.
// We use reflect to catch fields that might be added in the future without
// remembering to update the [isNotableNotify] function.
rt := reflect.TypeFor[ipn.Notify]()
for sf := range rt.Fields() {
n := &ipn.Notify{}
switch sf.Name {
case "_", "NetMap", "PeerChanges", "Engine", "Version":
// Already covered above or not applicable.
continue
case "DriveShares":
n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1))
default:
rf := reflect.ValueOf(n).Elem().FieldByIndex(sf.Index)
switch rf.Kind() {
case reflect.Pointer:
rf.Set(reflect.New(rf.Type().Elem()))
case reflect.String:
rf.SetString("foo")
case reflect.Slice:
rf.Set(reflect.MakeSlice(rf.Type(), 1, 1))
default:
t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name)
}
}
tests = append(tests, struct {
name string
notify *ipn.Notify
want bool
}{
name: "field-" + sf.Name,
notify: n,
want: true,
})
}
for _, tt := range tests {
if got := isNotableNotify(tt.notify); got != tt.want {
t.Errorf("%v: got %v; want %v", tt.name, got, tt.want)
}
}
}
type rateLimitingBusSenderTester struct {
tb testing.TB
got []*ipn.Notify
clock *tstest.Clock
s *rateLimitingBusSender
}
func (st *rateLimitingBusSenderTester) init() {
if st.s != nil {
return
}
st.clock = tstest.NewClock(tstest.ClockOpts{
Start: time.Unix(1731777537, 0), // time I wrote this test :)
})
st.s = &rateLimitingBusSender{
clock: tstime.DefaultClock{Clock: st.clock},
fn: func(n *ipn.Notify) bool {
st.got = append(st.got, n)
return true
},
}
}
func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) {
st.tb.Helper()
st.init()
if !st.s.send(n) {
st.tb.Fatal("unexpected send failed")
}
}
func (st *rateLimitingBusSenderTester) advance(d time.Duration) {
st.tb.Helper()
st.clock.Advance(d)
select {
case <-st.s.flushChan():
if !st.s.flush() {
st.tb.Fatal("unexpected flush failed")
}
default:
}
}
func TestRateLimitingBusSender(t *testing.T) {
nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
t.Run("unbuffered", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.send(nm1)
st.send(nm2)
st.send(eng1)
st.send(eng2)
if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) {
t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got))
}
})
t.Run("buffered", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.init()
st.s.interval = 1 * time.Second
st.send(&ipn.Notify{Version: "initial"})
if len(st.got) != 1 {
t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got))
}
st.send(nm1)
st.send(nm2)
st.send(eng1)
st.send(eng2)
if len(st.got) != 1 {
if len(st.got) != 1 {
t.Fatalf("got %d items; expected still just that first 1", len(st.got))
}
}
// But moving the clock should flush the rest, collasced into one new one.
st.advance(5 * time.Second)
if len(st.got) != 2 {
t.Fatalf("got %d items; want 2", len(st.got))
}
gotn := st.got[1]
if gotn.NetMap != nm2.NetMap {
t.Errorf("got wrong NetMap; got %p", gotn.NetMap)
}
if gotn.Engine != eng2.Engine {
t.Errorf("got wrong Engine; got %p", gotn.Engine)
}
if t.Failed() {
t.Logf("failed Notify was: %v", logger.AsJSON(gotn))
}
})
// Test the Run method
t.Run("run", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.init()
st.s.interval = 1 * time.Second
st.s.lastFlush = st.clock.Now() // pretend we just flushed
flushc := make(chan *ipn.Notify, 1)
st.s.fn = func(n *ipn.Notify) bool {
flushc <- n
return true
}
didSend := make(chan bool, 2)
st.s.didSendTestHook = func() { didSend <- true }
waitSend := func() {
select {
case <-didSend:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for call to send")
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
incoming := make(chan *ipn.Notify, 2)
go func() {
incoming <- nm1
waitSend()
incoming <- nm2
waitSend()
st.advance(5 * time.Second)
select {
case n := <-flushc:
if n.NetMap != nm2.NetMap {
t.Errorf("got wrong NetMap; got %p", n.NetMap)
}
case <-time.After(10 * time.Second):
t.Error("timeout")
}
cancel()
}()
st.s.Run(ctx, incoming)
})
}
func TestMergePeerChanges(t *testing.T) {
online := true
offline := false
t.Run("no_overlap_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 2, DERPRegion: 2},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[1].NodeID != 2 {
t.Errorf("got NodeIDs %d, %d; want 1, 2", got[0].NodeID, got[1].NodeID)
}
})
t.Run("overlap_merges", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online},
{NodeID: 2, DERPRegion: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 5, Online: &offline},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2 (merged, not appended)", len(got))
}
if got[0].DERPRegion != 5 {
t.Errorf("DERPRegion = %d; want 5 (from new)", got[0].DERPRegion)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
// Node 2 should be untouched.
if got[1].NodeID != 2 || got[1].DERPRegion != 10 {
t.Errorf("node 2 was modified unexpectedly")
}
})
t.Run("partial_overlap_merges_and_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 2},
{NodeID: 3, DERPRegion: 30},
}
got := mergePeerChanges(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[0].DERPRegion != 2 {
t.Errorf("node 1: DERPRegion = %d; want 2", got[0].DERPRegion)
}
if got[1].NodeID != 3 || got[1].DERPRegion != 30 {
t.Errorf("node 3: DERPRegion = %d; want 30", got[1].DERPRegion)
}
})
t.Run("preserves_old_fields_on_merge", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online, Cap: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, Online: &offline},
}
got := mergePeerChanges(old, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].DERPRegion != 1 {
t.Errorf("DERPRegion = %d; want 1 (preserved from old)", got[0].DERPRegion)
}
if got[0].Cap != 10 {
t.Errorf("Cap = %d; want 10 (preserved from old)", got[0].Cap)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
})
t.Run("nil_old", func(t *testing.T) {
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
got := mergePeerChanges(nil, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].NodeID != 1 {
t.Errorf("NodeID = %d; want 1", got[0].NodeID)
}
})
}