appc: publish events for route updates and storage

Nothing subscribes to them yet, and we need tests.

Add a Close method to the AppConnector to make sure the client gets cleaned up
when the connector is dropped (we re-create connectors).

Change-Id: I184670ba2fb920e0d2cb2be7c6816259bca77afe
This commit is contained in:
M. J. Fromberger 2025-09-12 09:55:39 -07:00
parent c563c3a8ef
commit ccbf115875
3 changed files with 70 additions and 6 deletions

View File

@ -12,6 +12,7 @@ package appc
import (
"context"
"fmt"
"maps"
"net/netip"
"slices"
"strings"
@ -129,6 +130,13 @@ type RouteInfo struct {
Wildcards []string `json:",omitempty"`
}
// RouteUpdate records a set of routes that should be advertised and a set of
// routes that should be unadvertised in event bus updates.
type RouteUpdate struct {
Advertise []netip.Prefix
Unadvertise []netip.Prefix
}
// AppConnector is an implementation of an AppConnector that performs
// its function as a subsystem inside of a tailscale node. At the control plane
// side App Connector routing is configured in terms of domains rather than IP
@ -143,6 +151,9 @@ type AppConnector struct {
logf logger.Logf
eventBus *eventbus.Bus
routeAdvertiser RouteAdvertiser
pubClient *eventbus.Client
updatePub *eventbus.Publisher[RouteUpdate]
storePub *eventbus.Publisher[RouteInfo]
// storeRoutesFunc will be called to persist routes if it is not nil.
storeRoutesFunc func(*RouteInfo) error
@ -200,10 +211,14 @@ func NewAppConnector(c Config) *AppConnector {
case c.RouteAdvertiser == nil:
panic("missing route advertiser")
}
ec := c.EventBus.Client("appc.AppConnector")
ac := &AppConnector{
logf: logger.WithPrefix(c.Logf, "appc: "),
eventBus: c.EventBus,
pubClient: ec,
updatePub: eventbus.Publish[RouteUpdate](ec),
storePub: eventbus.Publish[RouteInfo](ec),
routeAdvertiser: c.RouteAdvertiser,
storeRoutesFunc: c.StoreRoutesFunc,
}
@ -230,6 +245,14 @@ func (e *AppConnector) ShouldStoreRoutes() bool {
// storeRoutesLocked takes the current state of the AppConnector and persists it
func (e *AppConnector) storeRoutesLocked() error {
if e.storePub.ShouldPublish() {
e.storePub.Publish(RouteInfo{
// Clone here, as the subscriber will handle these outside our lock.
Control: slices.Clone(e.controlRoutes),
Domains: maps.Clone(e.domains),
Wildcards: slices.Clone(e.wildcards),
})
}
if !e.ShouldStoreRoutes() {
return nil
}
@ -242,6 +265,7 @@ func (e *AppConnector) storeRoutesLocked() error {
e.writeRateMinute.update(numRoutes)
e.writeRateDay.update(numRoutes)
// TODO(creachdair): Remove this once it's delivered over the event bus.
return e.storeRoutesFunc(&RouteInfo{
Control: e.controlRoutes,
Domains: e.domains,
@ -285,6 +309,18 @@ func (e *AppConnector) Wait(ctx context.Context) {
e.queue.Wait(ctx)
}
// Close closes the connector and cleans up resources associated with it.
// It is safe (and a noop) to call Close on nil.
func (e *AppConnector) Close() {
if e == nil {
return
}
e.mu.Lock()
defer e.mu.Unlock()
e.queue.Shutdown() // TODO(creachadair): Should we wait for it too?
e.pubClient.Close()
}
func (e *AppConnector) updateDomains(domains []string) {
e.mu.Lock()
defer e.mu.Unlock()
@ -325,11 +361,15 @@ func (e *AppConnector) updateDomains(domains []string) {
toRemove = append(toRemove, netip.PrefixFrom(a, a.BitLen()))
}
}
if len(toRemove) != 0 {
e.queue.Add(func() {
if err := e.routeAdvertiser.UnadvertiseRoute(toRemove...); err != nil {
e.logf("failed to unadvertise routes on domain removal: %v: %v: %v", slicesx.MapKeys(oldDomains), toRemove, err)
}
})
e.updatePub.Publish(RouteUpdate{Unadvertise: toRemove})
}
}
e.logf("handling domains: %v and wildcards: %v", slicesx.MapKeys(e.domains), e.wildcards)
@ -379,6 +419,10 @@ nextRoute:
e.logf("failed to unadvertise routes: %v: %v", toRemove, err)
}
})
e.updatePub.Publish(RouteUpdate{
Advertise: routes,
Unadvertise: toRemove,
})
e.controlRoutes = routes
if err := e.storeRoutesLocked(); err != nil {
@ -584,6 +628,7 @@ func (e *AppConnector) scheduleAdvertisement(domain string, routes ...netip.Pref
e.logf("failed to advertise routes for %s: %v: %v", domain, routes, err)
return
}
e.updatePub.Publish(RouteUpdate{Advertise: routes})
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -39,8 +39,9 @@ func TestUpdateDomains(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: &appctest.RouteCollector{}})
}
a.UpdateDomains([]string{"example.com"})
t.Cleanup(a.Close)
a.UpdateDomains([]string{"example.com"})
a.Wait(ctx)
if got, want := a.Domains().AsSlice(), []string{"example.com"}; !slices.Equal(got, want) {
t.Errorf("got %v; want %v", got, want)
@ -80,6 +81,8 @@ func TestUpdateRoutes(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
a.updateDomains([]string{"*.example.com"})
// This route should be collapsed into the range
@ -136,6 +139,8 @@ func TestUpdateRoutesUnadvertisesContainedRoutes(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
mak.Set(&a.domains, "example.com", []netip.Addr{netip.MustParseAddr("192.0.2.1")})
rc.SetRoutes([]netip.Prefix{netip.MustParsePrefix("192.0.2.1/32")})
routes := []netip.Prefix{netip.MustParsePrefix("192.0.2.0/24")}
@ -164,6 +169,7 @@ func TestDomainRoutes(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
a.updateDomains([]string{"example.com"})
if err := a.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil {
t.Errorf("ObserveDNSResponse: %v", err)
@ -197,6 +203,7 @@ func TestObserveDNSResponse(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
// a has no domains configured, so it should not advertise any routes
if err := a.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil {
@ -293,6 +300,7 @@ func TestWildcardDomains(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
a.updateDomains([]string{"*.example.com"})
if err := a.ObserveDNSResponse(dnsResponse("foo.example.com.", "192.0.0.8")); err != nil {
@ -460,6 +468,8 @@ func TestUpdateRouteRouteRemoval(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
// nothing has yet been advertised
assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{})
@ -512,6 +522,8 @@ func TestUpdateDomainRouteRemoval(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{})
a.UpdateDomainsAndRoutes([]string{"a.example.com", "b.example.com"}, []netip.Prefix{})
@ -574,6 +586,8 @@ func TestUpdateWildcardRouteRemoval(t *testing.T) {
} else {
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
}
t.Cleanup(a.Close)
assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{})
a.UpdateDomainsAndRoutes([]string{"a.example.com", "*.b.example.com"}, []netip.Prefix{})
@ -719,6 +733,7 @@ func TestUpdateRoutesDeadlock(t *testing.T) {
RouteInfo: &RouteInfo{},
StoreRoutesFunc: fakeStoreRoutes,
})
t.Cleanup(a.Close)
advertiseCalled := new(atomic.Bool)
unadvertiseCalled := new(atomic.Bool)

View File

@ -1152,6 +1152,7 @@ func (b *LocalBackend) Shutdown() {
}
extHost := b.extHost
b.extHost = nil
b.appConnector.Close()
b.mu.Unlock()
b.webClientShutdown()
@ -5042,6 +5043,7 @@ func (b *LocalBackend) reconfigAppConnectorLocked(nm *netmap.NetworkMap, prefs i
}()
if !prefs.AppConnector().Advertise {
b.appConnector.Close() // clean up a previous connector (safe on nil)
b.appConnector = nil
return
}
@ -5061,6 +5063,8 @@ func (b *LocalBackend) reconfigAppConnectorLocked(nm *netmap.NetworkMap, prefs i
}
storeFunc = b.storeRouteInfo
}
b.appConnector.Close() // clean up a previous connector (safe on nil)
b.appConnector = appc.NewAppConnector(appc.Config{
Logf: b.logf,
EventBus: b.sys.Bus.Get(),