From ccbf115875de67774812e834d08bcaab9a87cb2b Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 12 Sep 2025 09:55:39 -0700 Subject: [PATCH] appc: publish events for route updates and storage Nothing subscribes to them yet, and we need tests. Add a Close method to the AppConnector to make sure the client gets cleaned up when the connector is dropped (we re-create connectors). Change-Id: I184670ba2fb920e0d2cb2be7c6816259bca77afe --- appc/appconnector.go | 55 +++++++++++++++++++++++++++++++++++---- appc/appconnector_test.go | 17 +++++++++++- ipn/ipnlocal/local.go | 4 +++ 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/appc/appconnector.go b/appc/appconnector.go index 69a22aac5..2e645eabb 100644 --- a/appc/appconnector.go +++ b/appc/appconnector.go @@ -12,6 +12,7 @@ package appc import ( "context" "fmt" + "maps" "net/netip" "slices" "strings" @@ -129,6 +130,13 @@ type RouteInfo struct { Wildcards []string `json:",omitempty"` } +// RouteUpdate records a set of routes that should be advertised and a set of +// routes that should be unadvertised in event bus updates. +type RouteUpdate struct { + Advertise []netip.Prefix + Unadvertise []netip.Prefix +} + // AppConnector is an implementation of an AppConnector that performs // its function as a subsystem inside of a tailscale node. At the control plane // side App Connector routing is configured in terms of domains rather than IP @@ -143,6 +151,9 @@ type AppConnector struct { logf logger.Logf eventBus *eventbus.Bus routeAdvertiser RouteAdvertiser + pubClient *eventbus.Client + updatePub *eventbus.Publisher[RouteUpdate] + storePub *eventbus.Publisher[RouteInfo] // storeRoutesFunc will be called to persist routes if it is not nil. storeRoutesFunc func(*RouteInfo) error @@ -200,10 +211,14 @@ func NewAppConnector(c Config) *AppConnector { case c.RouteAdvertiser == nil: panic("missing route advertiser") } + ec := c.EventBus.Client("appc.AppConnector") ac := &AppConnector{ logf: logger.WithPrefix(c.Logf, "appc: "), eventBus: c.EventBus, + pubClient: ec, + updatePub: eventbus.Publish[RouteUpdate](ec), + storePub: eventbus.Publish[RouteInfo](ec), routeAdvertiser: c.RouteAdvertiser, storeRoutesFunc: c.StoreRoutesFunc, } @@ -230,6 +245,14 @@ func (e *AppConnector) ShouldStoreRoutes() bool { // storeRoutesLocked takes the current state of the AppConnector and persists it func (e *AppConnector) storeRoutesLocked() error { + if e.storePub.ShouldPublish() { + e.storePub.Publish(RouteInfo{ + // Clone here, as the subscriber will handle these outside our lock. + Control: slices.Clone(e.controlRoutes), + Domains: maps.Clone(e.domains), + Wildcards: slices.Clone(e.wildcards), + }) + } if !e.ShouldStoreRoutes() { return nil } @@ -242,6 +265,7 @@ func (e *AppConnector) storeRoutesLocked() error { e.writeRateMinute.update(numRoutes) e.writeRateDay.update(numRoutes) + // TODO(creachdair): Remove this once it's delivered over the event bus. return e.storeRoutesFunc(&RouteInfo{ Control: e.controlRoutes, Domains: e.domains, @@ -285,6 +309,18 @@ func (e *AppConnector) Wait(ctx context.Context) { e.queue.Wait(ctx) } +// Close closes the connector and cleans up resources associated with it. +// It is safe (and a noop) to call Close on nil. +func (e *AppConnector) Close() { + if e == nil { + return + } + e.mu.Lock() + defer e.mu.Unlock() + e.queue.Shutdown() // TODO(creachadair): Should we wait for it too? + e.pubClient.Close() +} + func (e *AppConnector) updateDomains(domains []string) { e.mu.Lock() defer e.mu.Unlock() @@ -325,11 +361,15 @@ func (e *AppConnector) updateDomains(domains []string) { toRemove = append(toRemove, netip.PrefixFrom(a, a.BitLen())) } } - e.queue.Add(func() { - if err := e.routeAdvertiser.UnadvertiseRoute(toRemove...); err != nil { - e.logf("failed to unadvertise routes on domain removal: %v: %v: %v", slicesx.MapKeys(oldDomains), toRemove, err) - } - }) + + if len(toRemove) != 0 { + e.queue.Add(func() { + if err := e.routeAdvertiser.UnadvertiseRoute(toRemove...); err != nil { + e.logf("failed to unadvertise routes on domain removal: %v: %v: %v", slicesx.MapKeys(oldDomains), toRemove, err) + } + }) + e.updatePub.Publish(RouteUpdate{Unadvertise: toRemove}) + } } e.logf("handling domains: %v and wildcards: %v", slicesx.MapKeys(e.domains), e.wildcards) @@ -379,6 +419,10 @@ nextRoute: e.logf("failed to unadvertise routes: %v: %v", toRemove, err) } }) + e.updatePub.Publish(RouteUpdate{ + Advertise: routes, + Unadvertise: toRemove, + }) e.controlRoutes = routes if err := e.storeRoutesLocked(); err != nil { @@ -584,6 +628,7 @@ func (e *AppConnector) scheduleAdvertisement(domain string, routes ...netip.Pref e.logf("failed to advertise routes for %s: %v: %v", domain, routes, err) return } + e.updatePub.Publish(RouteUpdate{Advertise: routes}) e.mu.Lock() defer e.mu.Unlock() diff --git a/appc/appconnector_test.go b/appc/appconnector_test.go index c23908c28..ec49d6dcc 100644 --- a/appc/appconnector_test.go +++ b/appc/appconnector_test.go @@ -39,8 +39,9 @@ func TestUpdateDomains(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: &appctest.RouteCollector{}}) } - a.UpdateDomains([]string{"example.com"}) + t.Cleanup(a.Close) + a.UpdateDomains([]string{"example.com"}) a.Wait(ctx) if got, want := a.Domains().AsSlice(), []string{"example.com"}; !slices.Equal(got, want) { t.Errorf("got %v; want %v", got, want) @@ -80,6 +81,8 @@ func TestUpdateRoutes(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) + a.updateDomains([]string{"*.example.com"}) // This route should be collapsed into the range @@ -136,6 +139,8 @@ func TestUpdateRoutesUnadvertisesContainedRoutes(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) + mak.Set(&a.domains, "example.com", []netip.Addr{netip.MustParseAddr("192.0.2.1")}) rc.SetRoutes([]netip.Prefix{netip.MustParsePrefix("192.0.2.1/32")}) routes := []netip.Prefix{netip.MustParsePrefix("192.0.2.0/24")} @@ -164,6 +169,7 @@ func TestDomainRoutes(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) a.updateDomains([]string{"example.com"}) if err := a.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil { t.Errorf("ObserveDNSResponse: %v", err) @@ -197,6 +203,7 @@ func TestObserveDNSResponse(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) // a has no domains configured, so it should not advertise any routes if err := a.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil { @@ -293,6 +300,7 @@ func TestWildcardDomains(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) a.updateDomains([]string{"*.example.com"}) if err := a.ObserveDNSResponse(dnsResponse("foo.example.com.", "192.0.0.8")); err != nil { @@ -460,6 +468,8 @@ func TestUpdateRouteRouteRemoval(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) + // nothing has yet been advertised assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{}) @@ -512,6 +522,8 @@ func TestUpdateDomainRouteRemoval(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) + assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{}) a.UpdateDomainsAndRoutes([]string{"a.example.com", "b.example.com"}, []netip.Prefix{}) @@ -574,6 +586,8 @@ func TestUpdateWildcardRouteRemoval(t *testing.T) { } else { a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc}) } + t.Cleanup(a.Close) + assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{}) a.UpdateDomainsAndRoutes([]string{"a.example.com", "*.b.example.com"}, []netip.Prefix{}) @@ -719,6 +733,7 @@ func TestUpdateRoutesDeadlock(t *testing.T) { RouteInfo: &RouteInfo{}, StoreRoutesFunc: fakeStoreRoutes, }) + t.Cleanup(a.Close) advertiseCalled := new(atomic.Bool) unadvertiseCalled := new(atomic.Bool) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 613b35b89..5a67398b4 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -1152,6 +1152,7 @@ func (b *LocalBackend) Shutdown() { } extHost := b.extHost b.extHost = nil + b.appConnector.Close() b.mu.Unlock() b.webClientShutdown() @@ -5042,6 +5043,7 @@ func (b *LocalBackend) reconfigAppConnectorLocked(nm *netmap.NetworkMap, prefs i }() if !prefs.AppConnector().Advertise { + b.appConnector.Close() // clean up a previous connector (safe on nil) b.appConnector = nil return } @@ -5061,6 +5063,8 @@ func (b *LocalBackend) reconfigAppConnectorLocked(nm *netmap.NetworkMap, prefs i } storeFunc = b.storeRouteInfo } + + b.appConnector.Close() // clean up a previous connector (safe on nil) b.appConnector = appc.NewAppConnector(appc.Config{ Logf: b.logf, EventBus: b.sys.Bus.Get(),