mirror of
https://github.com/tailscale/tailscale.git
synced 2026-05-05 20:26:47 +02:00
ipn/local: add netmap mutations to the ipn bus updates tailscale/tailscale#1909 This adds a new new NotifyWatchOpt that allows watchers to receive PeerChange events (derived from node mutations) on the IPN bus in lieu of a complete netmap. We'll continue to send the full netmap for any map response that includes it, but for mutations, sending PeerChange events gives the client the option to manage it's own models more selectively and cuts way down on json serialization overhead. On chatty tailnets, this will vastly reduce the amount of chatter on the bus. This change should be backwards compatible, it is purely additive. Clients that subscribe to NotifyNetmap will get the full netmap for every delta. New clients can omit that and instead opt into NotifyPeerChanges. Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
322 lines
8.0 KiB
Go
322 lines
8.0 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package ipnlocal
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"slices"
|
|
"testing"
|
|
"time"
|
|
|
|
"tailscale.com/drive"
|
|
"tailscale.com/ipn"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/tstest"
|
|
"tailscale.com/tstime"
|
|
"tailscale.com/types/logger"
|
|
"tailscale.com/types/netmap"
|
|
"tailscale.com/types/views"
|
|
)
|
|
|
|
func TestIsNotableNotify(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
notify *ipn.Notify
|
|
want bool
|
|
}{
|
|
{"nil", nil, false},
|
|
{"empty", &ipn.Notify{}, false},
|
|
{"version", &ipn.Notify{Version: "foo"}, false},
|
|
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
|
|
{"peerchanges", &ipn.Notify{PeerChanges: []*tailcfg.PeerChange{{}}}, false},
|
|
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
|
|
}
|
|
|
|
// Then for all other fields, assume they're notable.
|
|
// We use reflect to catch fields that might be added in the future without
|
|
// remembering to update the [isNotableNotify] function.
|
|
rt := reflect.TypeFor[ipn.Notify]()
|
|
for sf := range rt.Fields() {
|
|
n := &ipn.Notify{}
|
|
switch sf.Name {
|
|
case "_", "NetMap", "PeerChanges", "Engine", "Version":
|
|
// Already covered above or not applicable.
|
|
continue
|
|
case "DriveShares":
|
|
n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1))
|
|
default:
|
|
rf := reflect.ValueOf(n).Elem().FieldByIndex(sf.Index)
|
|
switch rf.Kind() {
|
|
case reflect.Pointer:
|
|
rf.Set(reflect.New(rf.Type().Elem()))
|
|
case reflect.String:
|
|
rf.SetString("foo")
|
|
case reflect.Slice:
|
|
rf.Set(reflect.MakeSlice(rf.Type(), 1, 1))
|
|
default:
|
|
t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name)
|
|
}
|
|
}
|
|
|
|
tests = append(tests, struct {
|
|
name string
|
|
notify *ipn.Notify
|
|
want bool
|
|
}{
|
|
name: "field-" + sf.Name,
|
|
notify: n,
|
|
want: true,
|
|
})
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
if got := isNotableNotify(tt.notify); got != tt.want {
|
|
t.Errorf("%v: got %v; want %v", tt.name, got, tt.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
type rateLimitingBusSenderTester struct {
|
|
tb testing.TB
|
|
got []*ipn.Notify
|
|
clock *tstest.Clock
|
|
s *rateLimitingBusSender
|
|
}
|
|
|
|
func (st *rateLimitingBusSenderTester) init() {
|
|
if st.s != nil {
|
|
return
|
|
}
|
|
st.clock = tstest.NewClock(tstest.ClockOpts{
|
|
Start: time.Unix(1731777537, 0), // time I wrote this test :)
|
|
})
|
|
st.s = &rateLimitingBusSender{
|
|
clock: tstime.DefaultClock{Clock: st.clock},
|
|
fn: func(n *ipn.Notify) bool {
|
|
st.got = append(st.got, n)
|
|
return true
|
|
},
|
|
}
|
|
}
|
|
|
|
func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) {
|
|
st.tb.Helper()
|
|
st.init()
|
|
if !st.s.send(n) {
|
|
st.tb.Fatal("unexpected send failed")
|
|
}
|
|
}
|
|
|
|
func (st *rateLimitingBusSenderTester) advance(d time.Duration) {
|
|
st.tb.Helper()
|
|
st.clock.Advance(d)
|
|
select {
|
|
case <-st.s.flushChan():
|
|
if !st.s.flush() {
|
|
st.tb.Fatal("unexpected flush failed")
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
|
|
func TestRateLimitingBusSender(t *testing.T) {
|
|
nm1 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
|
|
nm2 := &ipn.Notify{NetMap: new(netmap.NetworkMap)}
|
|
eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
|
|
eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
|
|
|
|
t.Run("unbuffered", func(t *testing.T) {
|
|
st := &rateLimitingBusSenderTester{tb: t}
|
|
st.send(nm1)
|
|
st.send(nm2)
|
|
st.send(eng1)
|
|
st.send(eng2)
|
|
if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) {
|
|
t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got))
|
|
}
|
|
})
|
|
|
|
t.Run("buffered", func(t *testing.T) {
|
|
st := &rateLimitingBusSenderTester{tb: t}
|
|
st.init()
|
|
st.s.interval = 1 * time.Second
|
|
st.send(&ipn.Notify{Version: "initial"})
|
|
if len(st.got) != 1 {
|
|
t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got))
|
|
}
|
|
st.send(nm1)
|
|
st.send(nm2)
|
|
st.send(eng1)
|
|
st.send(eng2)
|
|
if len(st.got) != 1 {
|
|
if len(st.got) != 1 {
|
|
t.Fatalf("got %d items; expected still just that first 1", len(st.got))
|
|
}
|
|
}
|
|
|
|
// But moving the clock should flush the rest, collasced into one new one.
|
|
st.advance(5 * time.Second)
|
|
if len(st.got) != 2 {
|
|
t.Fatalf("got %d items; want 2", len(st.got))
|
|
}
|
|
gotn := st.got[1]
|
|
if gotn.NetMap != nm2.NetMap {
|
|
t.Errorf("got wrong NetMap; got %p", gotn.NetMap)
|
|
}
|
|
if gotn.Engine != eng2.Engine {
|
|
t.Errorf("got wrong Engine; got %p", gotn.Engine)
|
|
}
|
|
if t.Failed() {
|
|
t.Logf("failed Notify was: %v", logger.AsJSON(gotn))
|
|
}
|
|
})
|
|
|
|
// Test the Run method
|
|
t.Run("run", func(t *testing.T) {
|
|
st := &rateLimitingBusSenderTester{tb: t}
|
|
st.init()
|
|
st.s.interval = 1 * time.Second
|
|
st.s.lastFlush = st.clock.Now() // pretend we just flushed
|
|
|
|
flushc := make(chan *ipn.Notify, 1)
|
|
st.s.fn = func(n *ipn.Notify) bool {
|
|
flushc <- n
|
|
return true
|
|
}
|
|
didSend := make(chan bool, 2)
|
|
st.s.didSendTestHook = func() { didSend <- true }
|
|
waitSend := func() {
|
|
select {
|
|
case <-didSend:
|
|
case <-time.After(5 * time.Second):
|
|
t.Error("timeout waiting for call to send")
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
incoming := make(chan *ipn.Notify, 2)
|
|
go func() {
|
|
incoming <- nm1
|
|
waitSend()
|
|
incoming <- nm2
|
|
waitSend()
|
|
st.advance(5 * time.Second)
|
|
select {
|
|
case n := <-flushc:
|
|
if n.NetMap != nm2.NetMap {
|
|
t.Errorf("got wrong NetMap; got %p", n.NetMap)
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Error("timeout")
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
st.s.Run(ctx, incoming)
|
|
})
|
|
}
|
|
|
|
func TestMergePeerChanges(t *testing.T) {
|
|
online := true
|
|
offline := false
|
|
|
|
t.Run("no_overlap_appends", func(t *testing.T) {
|
|
old := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 1},
|
|
}
|
|
new := []*tailcfg.PeerChange{
|
|
{NodeID: 2, DERPRegion: 2},
|
|
}
|
|
got := mergePeerChanges(old, new)
|
|
if len(got) != 2 {
|
|
t.Fatalf("len = %d; want 2", len(got))
|
|
}
|
|
if got[0].NodeID != 1 || got[1].NodeID != 2 {
|
|
t.Errorf("got NodeIDs %d, %d; want 1, 2", got[0].NodeID, got[1].NodeID)
|
|
}
|
|
})
|
|
|
|
t.Run("overlap_merges", func(t *testing.T) {
|
|
old := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 1, Online: &online},
|
|
{NodeID: 2, DERPRegion: 10},
|
|
}
|
|
new := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 5, Online: &offline},
|
|
}
|
|
got := mergePeerChanges(old, new)
|
|
if len(got) != 2 {
|
|
t.Fatalf("len = %d; want 2 (merged, not appended)", len(got))
|
|
}
|
|
if got[0].DERPRegion != 5 {
|
|
t.Errorf("DERPRegion = %d; want 5 (from new)", got[0].DERPRegion)
|
|
}
|
|
if *got[0].Online != false {
|
|
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
|
|
}
|
|
// Node 2 should be untouched.
|
|
if got[1].NodeID != 2 || got[1].DERPRegion != 10 {
|
|
t.Errorf("node 2 was modified unexpectedly")
|
|
}
|
|
})
|
|
|
|
t.Run("partial_overlap_merges_and_appends", func(t *testing.T) {
|
|
old := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 1},
|
|
}
|
|
new := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 2},
|
|
{NodeID: 3, DERPRegion: 30},
|
|
}
|
|
got := mergePeerChanges(old, new)
|
|
if len(got) != 2 {
|
|
t.Fatalf("len = %d; want 2", len(got))
|
|
}
|
|
if got[0].NodeID != 1 || got[0].DERPRegion != 2 {
|
|
t.Errorf("node 1: DERPRegion = %d; want 2", got[0].DERPRegion)
|
|
}
|
|
if got[1].NodeID != 3 || got[1].DERPRegion != 30 {
|
|
t.Errorf("node 3: DERPRegion = %d; want 30", got[1].DERPRegion)
|
|
}
|
|
})
|
|
|
|
t.Run("preserves_old_fields_on_merge", func(t *testing.T) {
|
|
old := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 1, Online: &online, Cap: 10},
|
|
}
|
|
new := []*tailcfg.PeerChange{
|
|
{NodeID: 1, Online: &offline},
|
|
}
|
|
got := mergePeerChanges(old, new)
|
|
if len(got) != 1 {
|
|
t.Fatalf("len = %d; want 1", len(got))
|
|
}
|
|
if got[0].DERPRegion != 1 {
|
|
t.Errorf("DERPRegion = %d; want 1 (preserved from old)", got[0].DERPRegion)
|
|
}
|
|
if got[0].Cap != 10 {
|
|
t.Errorf("Cap = %d; want 10 (preserved from old)", got[0].Cap)
|
|
}
|
|
if *got[0].Online != false {
|
|
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
|
|
}
|
|
})
|
|
|
|
t.Run("nil_old", func(t *testing.T) {
|
|
new := []*tailcfg.PeerChange{
|
|
{NodeID: 1, DERPRegion: 1},
|
|
}
|
|
got := mergePeerChanges(nil, new)
|
|
if len(got) != 1 {
|
|
t.Fatalf("len = %d; want 1", len(got))
|
|
}
|
|
if got[0].NodeID != 1 {
|
|
t.Errorf("NodeID = %d; want 1", got[0].NodeID)
|
|
}
|
|
})
|
|
}
|