diff --git a/cmd/tailscaled/debug.go b/cmd/tailscaled/debug.go index 2f469a0d1..85dd787c1 100644 --- a/cmd/tailscaled/debug.go +++ b/cmd/tailscaled/debug.go @@ -161,7 +161,9 @@ func getURL(ctx context.Context, urlStr string) error { } func checkDerp(ctx context.Context, derpRegion string) (err error) { - ht := new(health.Tracker) + bus := eventbus.New() + defer bus.Close() + ht := health.NewTracker(bus) req, err := http.NewRequestWithContext(ctx, "GET", ipn.DefaultControlURL+"/derpmap/default", nil) if err != nil { return fmt.Errorf("create derp map request: %w", err) diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index 890ff7bf8..734c8e8e8 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -426,7 +426,7 @@ func run() (err error) { sys.Set(netMon) } - pol := logpolicy.New(logtail.CollectionNode, netMon, sys.HealthTracker(), nil /* use log.Printf */) + pol := logpolicy.New(logtail.CollectionNode, netMon, sys.HealthTracker.Get(), nil /* use log.Printf */) pol.SetVerbosityLevel(args.verbose) logPol = pol defer func() { @@ -461,7 +461,7 @@ func run() (err error) { // Always clean up, even if we're going to run the server. This covers cases // such as when a system was rebooted without shutting down, or tailscaled // crashed, and would for example restore system DNS configuration. - dns.CleanUp(logf, netMon, sys.HealthTracker(), args.tunname) + dns.CleanUp(logf, netMon, sys.HealthTracker.Get(), args.tunname) router.CleanUp(logf, netMon, args.tunname) // If the cleanUp flag was passed, then exit. if args.cleanUp { @@ -749,7 +749,7 @@ func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack boo conf := wgengine.Config{ ListenPort: args.port, NetMon: sys.NetMon.Get(), - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), Dialer: sys.Dialer.Get(), SetSubsystem: sys.Set, @@ -760,7 +760,7 @@ func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack boo f(&conf, logf) } - sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry()) + sys.HealthTracker.Get().SetMetricsRegistry(sys.UserMetricsRegistry()) onlyNetstack = name == "userspace-networking" netstackSubnetRouter := onlyNetstack // but mutated later on some platforms @@ -781,7 +781,7 @@ func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack boo // configuration being unavailable (from the noop // manager). More in Issue 4017. // TODO(bradfitz): add a Synology-specific DNS manager. - conf.DNS, err = dns.NewOSConfigurator(logf, sys.HealthTracker(), sys.PolicyClientOrDefault(), sys.ControlKnobs(), "") // empty interface name + conf.DNS, err = dns.NewOSConfigurator(logf, sys.HealthTracker.Get(), sys.PolicyClientOrDefault(), sys.ControlKnobs(), "") // empty interface name if err != nil { return false, fmt.Errorf("dns.NewOSConfigurator: %w", err) } @@ -809,13 +809,13 @@ func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack boo sys.NetMon.Get().SetTailscaleInterfaceName(devName) } - r, err := router.New(logf, dev, sys.NetMon.Get(), sys.HealthTracker(), sys.Bus.Get()) + r, err := router.New(logf, dev, sys.NetMon.Get(), sys.HealthTracker.Get(), sys.Bus.Get()) if err != nil { dev.Close() return false, fmt.Errorf("creating router: %w", err) } - d, err := dns.NewOSConfigurator(logf, sys.HealthTracker(), sys.PolicyClientOrDefault(), sys.ControlKnobs(), devName) + d, err := dns.NewOSConfigurator(logf, sys.HealthTracker.Get(), sys.PolicyClientOrDefault(), sys.ControlKnobs(), devName) if err != nil { dev.Close() r.Close() diff --git a/cmd/tsconnect/wasm/wasm_js.go b/cmd/tsconnect/wasm/wasm_js.go index 87f814866..ea40dba9c 100644 --- a/cmd/tsconnect/wasm/wasm_js.go +++ b/cmd/tsconnect/wasm/wasm_js.go @@ -108,7 +108,7 @@ func newIPN(jsConfig js.Value) map[string]any { Dialer: dialer, SetSubsystem: sys.Set, ControlKnobs: sys.ControlKnobs(), - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), EventBus: sys.Bus.Get(), }) diff --git a/control/controlclient/controlclient_test.go b/control/controlclient/controlclient_test.go index 2efc27b5e..78646d76a 100644 --- a/control/controlclient/controlclient_test.go +++ b/control/controlclient/controlclient_test.go @@ -236,7 +236,7 @@ func TestDirectProxyManual(t *testing.T) { }, DiscoPublicKey: key.NewDisco().Public(), Logf: t.Logf, - HealthTracker: &health.Tracker{}, + HealthTracker: health.NewTracker(bus), PopBrowserURL: func(url string) { t.Logf("PopBrowserURL: %q", url) }, @@ -328,7 +328,7 @@ func testHTTPS(t *testing.T, withProxy bool) { }, DiscoPublicKey: key.NewDisco().Public(), Logf: t.Logf, - HealthTracker: &health.Tracker{}, + HealthTracker: health.NewTracker(bus), PopBrowserURL: func(url string) { t.Logf("PopBrowserURL: %q", url) }, diff --git a/control/controlclient/map_test.go b/control/controlclient/map_test.go index ff5df8207..59b8988fc 100644 --- a/control/controlclient/map_test.go +++ b/control/controlclient/map_test.go @@ -28,6 +28,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/types/netmap" "tailscale.com/types/ptr" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/mak" "tailscale.com/util/must" ) @@ -1326,7 +1327,7 @@ func TestNetmapDisplayMessage(t *testing.T) { // [netmap.NetworkMap] to a [health.Tracker]. func TestNetmapHealthIntegration(t *testing.T) { ms := newTestMapSession(t, nil) - ht := health.Tracker{} + ht := health.NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() @@ -1371,7 +1372,7 @@ func TestNetmapHealthIntegration(t *testing.T) { // passing the [netmap.NetworkMap] to a [health.Tracker]. func TestNetmapDisplayMessageIntegration(t *testing.T) { ms := newTestMapSession(t, nil) - ht := health.Tracker{} + ht := health.NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() diff --git a/control/controlhttp/http_test.go b/control/controlhttp/http_test.go index daf262023..0b4e117f9 100644 --- a/control/controlhttp/http_test.go +++ b/control/controlhttp/http_test.go @@ -35,6 +35,7 @@ import ( "tailscale.com/tstime" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/util/eventbus/eventbustest" ) type httpTestParam struct { @@ -228,7 +229,7 @@ func testControlHTTP(t *testing.T, param httpTestParam) { omitCertErrorLogging: true, testFallbackDelay: fallbackDelay, Clock: clock, - HealthTracker: new(health.Tracker), + HealthTracker: health.NewTracker(eventbustest.NewBus(t)), } if param.httpInDial { @@ -730,7 +731,7 @@ func TestDialPlan(t *testing.T) { omitCertErrorLogging: true, testFallbackDelay: 50 * time.Millisecond, Clock: clock, - HealthTracker: new(health.Tracker), + HealthTracker: health.NewTracker(eventbustest.NewBus(t)), } conn, err := a.dial(ctx) diff --git a/health/health.go b/health/health.go index 058870438..c456b53cb 100644 --- a/health/health.go +++ b/health/health.go @@ -25,6 +25,7 @@ import ( "tailscale.com/tstime" "tailscale.com/types/opt" "tailscale.com/util/cibuild" + "tailscale.com/util/eventbus" "tailscale.com/util/mak" "tailscale.com/util/multierr" "tailscale.com/util/set" @@ -76,6 +77,9 @@ type Tracker struct { testClock tstime.Clock // nil means use time.Now / tstime.StdClock{} + eventClient *eventbus.Client + changePub *eventbus.Publisher[Change] + // mu guards everything that follows. mu sync.Mutex @@ -119,6 +123,20 @@ type Tracker struct { metricHealthMessage *metrics.MultiLabelMap[metricHealthMessageLabel] } +// NewTracker contructs a new [Tracker] and attaches the given eventbus. +// NewTracker will panic is no eventbus is given. +func NewTracker(bus *eventbus.Bus) *Tracker { + if bus == nil { + panic("no eventbus set") + } + + cli := bus.Client("health.Tracker") + return &Tracker{ + eventClient: cli, + changePub: eventbus.Publish[Change](cli), + } +} + func (t *Tracker) now() time.Time { if t.testClock != nil { return t.testClock.Now() @@ -418,6 +436,28 @@ func (t *Tracker) setUnhealthyLocked(w *Warnable, args Args) { Warnable: w, UnhealthyState: w.unhealthyState(ws), } + // Publish the change to the event bus. If the change is already visible + // now, publish it immediately; otherwise queue a timer to publish it at + // a future time when it becomes visible. + if w.IsVisible(ws, t.now) { + t.changePub.Publish(change) + } else { + visibleIn := w.TimeToVisible - t.now().Sub(brokenSince) + tc := t.clock().AfterFunc(visibleIn, func() { + t.mu.Lock() + defer t.mu.Unlock() + // Check if the Warnable is still unhealthy, as it could have become healthy between the time + // the timer was set for and the time it was executed. + if t.warnableVal[w] != nil { + t.changePub.Publish(change) + delete(t.pendingVisibleTimers, w) + } + }) + mak.Set(&t.pendingVisibleTimers, w, tc) + } + + // Direct callbacks + // TODO(cmol): Remove once all watchers have been moved to events for _, cb := range t.watchers { // If the Warnable has been unhealthy for more than its TimeToVisible, the callback should be // executed immediately. Otherwise, the callback should be enqueued to run once the Warnable @@ -473,7 +513,9 @@ func (t *Tracker) setHealthyLocked(w *Warnable) { WarnableChanged: true, Warnable: w, } + t.changePub.Publish(change) for _, cb := range t.watchers { + // TODO(cmol): Remove once all watchers have been moved to events cb(change) } } @@ -484,7 +526,11 @@ func (t *Tracker) notifyWatchersControlChangedLocked() { change := Change{ ControlHealthChanged: true, } + if t.changePub != nil { + t.changePub.Publish(change) + } for _, cb := range t.watchers { + // TODO(cmol): Remove once all watchers have been moved to events cb(change) } } diff --git a/health/health_test.go b/health/health_test.go index d66cea06c..c55b0e1f3 100644 --- a/health/health_test.go +++ b/health/health_test.go @@ -18,12 +18,34 @@ import ( "tailscale.com/tstest" "tailscale.com/tstime" "tailscale.com/types/opt" + "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/usermetric" "tailscale.com/version" ) +func wantChange(c Change) func(c Change) (bool, error) { + return func(cEv Change) (bool, error) { + if cEv.ControlHealthChanged != c.ControlHealthChanged { + return false, fmt.Errorf("expected ControlHealthChanged %t, got %t", c.ControlHealthChanged, cEv.ControlHealthChanged) + } + if cEv.WarnableChanged != c.WarnableChanged { + return false, fmt.Errorf("expected WarnableChanged %t, got %t", c.WarnableChanged, cEv.WarnableChanged) + } + if c.Warnable != nil && (cEv.Warnable == nil || cEv.Warnable != c.Warnable) { + return false, fmt.Errorf("expected Warnable %+v, got %+v", c.Warnable, cEv.Warnable) + } + + if c.UnhealthyState != nil { + panic("comparison of UnhealthyState is not yet supported") + } + + return true, nil + } +} + func TestAppendWarnableDebugFlags(t *testing.T) { - var tr Tracker + tr := NewTracker(eventbustest.NewBus(t)) for i := range 10 { w := Register(&Warnable{ @@ -68,7 +90,9 @@ func TestNilMethodsDontCrash(t *testing.T) { } func TestSetUnhealthyWithDuplicateThenHealthyAgain(t *testing.T) { - ht := Tracker{} + bus := eventbustest.NewBus(t) + watcher := eventbustest.NewWatcher(t, bus) + ht := NewTracker(bus) if len(ht.Strings()) != 0 { t.Fatalf("before first insertion, len(newTracker.Strings) = %d; want = 0", len(ht.Strings())) } @@ -92,10 +116,20 @@ func TestSetUnhealthyWithDuplicateThenHealthyAgain(t *testing.T) { if !reflect.DeepEqual(ht.Strings(), want) { t.Fatalf("after setting the healthy, newTracker.Strings() = %v; want = %v", ht.Strings(), want) } + + if err := eventbustest.ExpectExactly(watcher, + wantChange(Change{WarnableChanged: true, Warnable: testWarnable}), + wantChange(Change{WarnableChanged: true, Warnable: testWarnable}), + wantChange(Change{WarnableChanged: true, Warnable: testWarnable}), + ); err != nil { + t.Fatalf("expected events, got %q", err) + } } func TestRemoveAllWarnings(t *testing.T) { - ht := Tracker{} + bus := eventbustest.NewBus(t) + watcher := eventbustest.NewWatcher(t, bus) + ht := NewTracker(bus) if len(ht.Strings()) != 0 { t.Fatalf("before first insertion, len(newTracker.Strings) = %d; want = 0", len(ht.Strings())) } @@ -109,67 +143,105 @@ func TestRemoveAllWarnings(t *testing.T) { if len(ht.Strings()) != 0 { t.Fatalf("after RemoveAll, len(newTracker.Strings) = %d; want = 0", len(ht.Strings())) } + if err := eventbustest.ExpectExactly(watcher, + wantChange(Change{WarnableChanged: true, Warnable: testWarnable}), + wantChange(Change{WarnableChanged: true, Warnable: testWarnable}), + ); err != nil { + t.Fatalf("expected events, got %q", err) + } } // TestWatcher tests that a registered watcher function gets called with the correct // Warnable and non-nil/nil UnhealthyState upon setting a Warnable to unhealthy/healthy. func TestWatcher(t *testing.T) { - ht := Tracker{} - wantText := "Hello world" - becameUnhealthy := make(chan struct{}) - becameHealthy := make(chan struct{}) + tests := []struct { + name string + preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change)) + }{ + { + name: "with-callbacks", + preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) { + t.Cleanup(tht.RegisterWatcher(fn)) + if len(tht.watchers) != 1 { + t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers)) + } + }, + }, + { + name: "with-eventbus", + preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) { + client := bus.Client("healthwatchertestclient") + sub := eventbus.Subscribe[Change](client) + go func() { + for { + select { + case <-sub.Done(): + return + case change := <-sub.Events(): + fn(change) + } + } + }() + }, + }, + } - watcherFunc := func(c Change) { - w := c.Warnable - us := c.UnhealthyState - if w != testWarnable { - t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, testWarnable) - } + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + bus := eventbustest.NewBus(t) + ht := NewTracker(bus) + wantText := "Hello world" + becameUnhealthy := make(chan struct{}) + becameHealthy := make(chan struct{}) - if us != nil { - if us.Text != wantText { - t.Fatalf("unexpected us.Text: %s, want: %s", us.Text, wantText) + watcherFunc := func(c Change) { + w := c.Warnable + us := c.UnhealthyState + if w != testWarnable { + t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, testWarnable) + } + + if us != nil { + if us.Text != wantText { + t.Fatalf("unexpected us.Text: %q, want: %s", us.Text, wantText) + } + if us.Args[ArgError] != wantText { + t.Fatalf("unexpected us.Args[ArgError]: %q, want: %s", us.Args[ArgError], wantText) + } + becameUnhealthy <- struct{}{} + } else { + becameHealthy <- struct{}{} + } } - if us.Args[ArgError] != wantText { - t.Fatalf("unexpected us.Args[ArgError]: %s, want: %s", us.Args[ArgError], wantText) + + // Set up test + tt.preFunc(t, ht, bus, watcherFunc) + + // Start running actual test + ht.SetUnhealthy(testWarnable, Args{ArgError: wantText}) + + select { + case <-becameUnhealthy: + // Test passed because the watcher got notified of an unhealthy state + case <-becameHealthy: + // Test failed because the watcher got of a healthy state instead of an unhealthy one + t.Fatalf("watcherFunc was called with a healthy state") + case <-time.After(5 * time.Second): + t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy") } - becameUnhealthy <- struct{}{} - } else { - becameHealthy <- struct{}{} - } - } - unregisterFunc := ht.RegisterWatcher(watcherFunc) - if len(ht.watchers) != 1 { - t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(ht.watchers)) - } - ht.SetUnhealthy(testWarnable, Args{ArgError: wantText}) + ht.SetHealthy(testWarnable) - select { - case <-becameUnhealthy: - // Test passed because the watcher got notified of an unhealthy state - case <-becameHealthy: - // Test failed because the watcher got of a healthy state instead of an unhealthy one - t.Fatalf("watcherFunc was called with a healthy state") - case <-time.After(1 * time.Second): - t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy") - } - - ht.SetHealthy(testWarnable) - - select { - case <-becameUnhealthy: - // Test failed because the watcher got of an unhealthy state instead of a healthy one - t.Fatalf("watcherFunc was called with an unhealthy state") - case <-becameHealthy: - // Test passed because the watcher got notified of a healthy state - case <-time.After(1 * time.Second): - t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy") - } - - unregisterFunc() - if len(ht.watchers) != 0 { - t.Fatalf("after unregisterFunc, len(newTracker.watchers) = %d; want = 0", len(ht.watchers)) + select { + case <-becameUnhealthy: + // Test failed because the watcher got of an unhealthy state instead of a healthy one + t.Fatalf("watcherFunc was called with an unhealthy state") + case <-becameHealthy: + // Test passed because the watcher got notified of a healthy state + case <-time.After(5 * time.Second): + t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy") + } + }) } } @@ -178,45 +250,81 @@ func TestWatcher(t *testing.T) { // has a TimeToVisible set, which means that a watcher should only be notified of an unhealthy state after // the TimeToVisible duration has passed. func TestSetUnhealthyWithTimeToVisible(t *testing.T) { - ht := Tracker{} - mw := Register(&Warnable{ - Code: "test-warnable-3-secs-to-visible", - Title: "Test Warnable with 3 seconds to visible", - Text: StaticMessage("Hello world"), - TimeToVisible: 2 * time.Second, - ImpactsConnectivity: true, - }) - defer unregister(mw) - - becameUnhealthy := make(chan struct{}) - becameHealthy := make(chan struct{}) - - watchFunc := func(c Change) { - w := c.Warnable - us := c.UnhealthyState - if w != mw { - t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, w) - } - - if us != nil { - becameUnhealthy <- struct{}{} - } else { - becameHealthy <- struct{}{} - } + tests := []struct { + name string + preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change)) + }{ + { + name: "with-callbacks", + preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) { + t.Cleanup(tht.RegisterWatcher(fn)) + if len(tht.watchers) != 1 { + t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers)) + } + }, + }, + { + name: "with-eventbus", + preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) { + client := bus.Client("healthwatchertestclient") + sub := eventbus.Subscribe[Change](client) + go func() { + for { + select { + case <-sub.Done(): + return + case change := <-sub.Events(): + fn(change) + } + } + }() + }, + }, } + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + bus := eventbustest.NewBus(t) + ht := NewTracker(bus) + mw := Register(&Warnable{ + Code: "test-warnable-3-secs-to-visible", + Title: "Test Warnable with 3 seconds to visible", + Text: StaticMessage("Hello world"), + TimeToVisible: 2 * time.Second, + ImpactsConnectivity: true, + }) - ht.RegisterWatcher(watchFunc) - ht.SetUnhealthy(mw, Args{ArgError: "Hello world"}) + becameUnhealthy := make(chan struct{}) + becameHealthy := make(chan struct{}) - select { - case <-becameUnhealthy: - // Test failed because the watcher got notified of an unhealthy state - t.Fatalf("watcherFunc was called with an unhealthy state") - case <-becameHealthy: - // Test failed because the watcher got of a healthy state - t.Fatalf("watcherFunc was called with a healthy state") - case <-time.After(1 * time.Second): - // As expected, watcherFunc still had not been called after 1 second + watchFunc := func(c Change) { + w := c.Warnable + us := c.UnhealthyState + if w != mw { + t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, w) + } + + if us != nil { + becameUnhealthy <- struct{}{} + } else { + becameHealthy <- struct{}{} + } + } + + tt.preFunc(t, ht, bus, watchFunc) + ht.SetUnhealthy(mw, Args{ArgError: "Hello world"}) + + select { + case <-becameUnhealthy: + // Test failed because the watcher got notified of an unhealthy state + t.Fatalf("watcherFunc was called with an unhealthy state") + case <-becameHealthy: + // Test failed because the watcher got of a healthy state + t.Fatalf("watcherFunc was called with a healthy state") + case <-time.After(1 * time.Second): + // As expected, watcherFunc still had not been called after 1 second + } + unregister(mw) + }) } } @@ -242,7 +350,7 @@ func TestRegisterWarnablePanicsWithDuplicate(t *testing.T) { // TestCheckDependsOnAppearsInUnhealthyState asserts that the DependsOn field in the UnhealthyState // is populated with the WarnableCode(s) of the Warnable(s) that a warning depends on. func TestCheckDependsOnAppearsInUnhealthyState(t *testing.T) { - ht := Tracker{} + ht := NewTracker(eventbustest.NewBus(t)) w1 := Register(&Warnable{ Code: "w1", Text: StaticMessage("W1 Text"), @@ -352,11 +460,11 @@ func TestShowUpdateWarnable(t *testing.T) { } for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - tr := &Tracker{ - checkForUpdates: tt.check, - applyUpdates: tt.apply, - latestVersion: tt.cv, - } + tr := NewTracker(eventbustest.NewBus(t)) + tr.checkForUpdates = tt.check + tr.applyUpdates = tt.apply + tr.latestVersion = tt.cv + gotWarnable, gotShow := tr.showUpdateWarnable() if gotWarnable != tt.wantWarnable { t.Errorf("got warnable: %v, want: %v", gotWarnable, tt.wantWarnable) @@ -401,11 +509,10 @@ func TestHealthMetric(t *testing.T) { } for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - tr := &Tracker{ - checkForUpdates: tt.check, - applyUpdates: tt.apply, - latestVersion: tt.cv, - } + tr := NewTracker(eventbustest.NewBus(t)) + tr.checkForUpdates = tt.check + tr.applyUpdates = tt.apply + tr.latestVersion = tt.cv tr.SetMetricsRegistry(&usermetric.Registry{}) if val := tr.metricHealthMessage.Get(metricHealthMessageLabel{Type: MetricLabelWarning}).String(); val != strconv.Itoa(tt.wantMetricCount) { t.Fatalf("metric value: %q, want: %q", val, strconv.Itoa(tt.wantMetricCount)) @@ -426,9 +533,8 @@ func TestNoDERPHomeWarnable(t *testing.T) { Start: time.Unix(123, 0), FollowRealTime: false, }) - ht := &Tracker{ - testClock: clock, - } + ht := NewTracker(eventbustest.NewBus(t)) + ht.testClock = clock ht.SetIPNState("NeedsLogin", true) // Advance 30 seconds to get past the "recentlyLoggedIn" check. @@ -448,7 +554,7 @@ func TestNoDERPHomeWarnable(t *testing.T) { // but doesn't use tstest.Clock so avoids the deadlock // I hit: https://github.com/tailscale/tailscale/issues/14798 func TestNoDERPHomeWarnableManual(t *testing.T) { - ht := &Tracker{} + ht := NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) // Avoid wantRunning: @@ -462,7 +568,7 @@ func TestNoDERPHomeWarnableManual(t *testing.T) { } func TestControlHealth(t *testing.T) { - ht := Tracker{} + ht := NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() @@ -620,7 +726,7 @@ func TestControlHealthNotifies(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ht := Tracker{} + ht := NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() @@ -643,7 +749,7 @@ func TestControlHealthNotifies(t *testing.T) { } func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) { - ht := Tracker{} + ht := NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) gotNotified := false @@ -671,7 +777,7 @@ func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) { // created from Control health & returned by [Tracker.CurrentState] is different // when the details of the [tailcfg.DisplayMessage] are different. func TestCurrentStateETagControlHealth(t *testing.T) { - ht := Tracker{} + ht := NewTracker(eventbustest.NewBus(t)) ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() @@ -776,9 +882,8 @@ func TestCurrentStateETagControlHealth(t *testing.T) { // when the details of the Warnable are different. func TestCurrentStateETagWarnable(t *testing.T) { newTracker := func(clock tstime.Clock) *Tracker { - ht := &Tracker{ - testClock: clock, - } + ht := NewTracker(eventbustest.NewBus(t)) + ht.testClock = clock ht.SetIPNState("NeedsLogin", true) ht.GotStreamedMapResponse() return ht diff --git a/ipn/ipnlocal/extension_host_test.go b/ipn/ipnlocal/extension_host_test.go index 509833ff6..f5c081a5b 100644 --- a/ipn/ipnlocal/extension_host_test.go +++ b/ipn/ipnlocal/extension_host_test.go @@ -32,6 +32,7 @@ import ( "tailscale.com/types/lazy" "tailscale.com/types/logger" "tailscale.com/types/persist" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" ) @@ -847,7 +848,7 @@ func TestBackgroundProfileResolver(t *testing.T) { // Create a new profile manager and add the profiles to it. // We expose the profile manager to the extensions via the read-only [ipnext.ProfileStore] interface. - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) for i, p := range tt.profiles { // Generate a unique ID and key for each profile, // unless the profile already has them set diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 6d92e58d0..4c27bea45 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -206,6 +206,7 @@ type LocalBackend struct { eventClient *eventbus.Client clientVersionSub *eventbus.Subscriber[tailcfg.ClientVersion] autoUpdateSub *eventbus.Subscriber[controlclient.AutoUpdate] + healthChangeSub *eventbus.Subscriber[health.Change] subsDoneCh chan struct{} // closed when consumeEventbusTopics returns health *health.Tracker // always non-nil polc policyclient.Client // always non-nil @@ -216,7 +217,6 @@ type LocalBackend struct { pushDeviceToken syncs.AtomicValue[string] backendLogID logid.PublicID unregisterNetMon func() - unregisterHealthWatch func() unregisterSysPolicyWatch func() portpoll *portlist.Poller // may be nil portpollOnce sync.Once // guards starting readPoller @@ -488,7 +488,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo if loginFlags&controlclient.LocalBackendStartKeyOSNeutral != 0 { goos = "" } - pm, err := newProfileManagerWithGOOS(store, logf, sys.HealthTracker(), goos) + pm, err := newProfileManagerWithGOOS(store, logf, sys.HealthTracker.Get(), goos) if err != nil { return nil, err } @@ -521,7 +521,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo statsLogf: logger.LogOnChange(logf, 5*time.Minute, clock.Now), sys: sys, polc: sys.PolicyClientOrDefault(), - health: sys.HealthTracker(), + health: sys.HealthTracker.Get(), metrics: m, e: e, dialer: dialer, @@ -543,6 +543,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo b.eventClient = b.Sys().Bus.Get().Client("ipnlocal.LocalBackend") b.clientVersionSub = eventbus.Subscribe[tailcfg.ClientVersion](b.eventClient) b.autoUpdateSub = eventbus.Subscribe[controlclient.AutoUpdate](b.eventClient) + b.healthChangeSub = eventbus.Subscribe[health.Change](b.eventClient) nb := newNodeBackend(ctx, b.sys.Bus.Get()) b.currentNodeAtomic.Store(nb) nb.ready() @@ -570,7 +571,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo }() netMon := sys.NetMon.Get() - b.sockstatLogger, err = sockstatlog.NewLogger(logpolicy.LogsDir(logf), logf, logID, netMon, sys.HealthTracker()) + b.sockstatLogger, err = sockstatlog.NewLogger(logpolicy.LogsDir(logf), logf, logID, netMon, sys.HealthTracker.Get()) if err != nil { log.Printf("error setting up sockstat logger: %v", err) } @@ -595,8 +596,6 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo b.linkChange(&netmon.ChangeDelta{New: netMon.InterfaceState()}) b.unregisterNetMon = netMon.RegisterChangeCallback(b.linkChange) - b.unregisterHealthWatch = b.health.RegisterWatcher(b.onHealthChange) - if tunWrap, ok := b.sys.Tun.GetOK(); ok { tunWrap.PeerAPIPort = b.GetPeerAPIPort } else { @@ -628,12 +627,17 @@ func (b *LocalBackend) consumeEventbusTopics() { for { select { + // TODO(cmol): Move to using b.eventClient.Done() once implemented. + // In the meantime, we rely on the subs not going away until the client is + // closed, closing all its subscribers. case <-b.clientVersionSub.Done(): return case clientVersion := <-b.clientVersionSub.Events(): b.onClientVersion(&clientVersion) case au := <-b.autoUpdateSub.Events(): b.onTailnetDefaultAutoUpdate(au.Value) + case change := <-b.healthChangeSub.Events(): + b.onHealthChange(change) } } } @@ -1162,7 +1166,6 @@ func (b *LocalBackend) Shutdown() { b.stopOfflineAutoUpdate() b.unregisterNetMon() - b.unregisterHealthWatch() b.unregisterSysPolicyWatch() if cc != nil { cc.Shutdown() diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 261d5c4c2..354cf6864 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -470,7 +470,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { t.Log("Added memory store for testing") } if _, ok := sys.Engine.GetOK(); !ok { - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -2897,7 +2897,7 @@ func TestSetExitNodeIDPolicy(t *testing.T) { if test.prefs == nil { test.prefs = ipn.NewPrefs() } - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) pm.prefs = test.prefs.View() b.currentNode().SetNetMap(test.nm) b.pm = pm @@ -3501,7 +3501,7 @@ func TestApplySysPolicy(t *testing.T) { wantPrefs.ControlURL = ipn.DefaultControlURL } - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) pm.prefs = usePrefs.View() b := newTestBackend(t, polc) @@ -5802,7 +5802,7 @@ func newLocalBackendWithSysAndTestControl(t *testing.T, enableLogging bool, sys sys.Set(store) } if _, hasEngine := sys.Engine.GetOK(); !hasEngine { - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/ipn/ipnlocal/loglines_test.go b/ipn/ipnlocal/loglines_test.go index 5bea6cabc..d831aa8b0 100644 --- a/ipn/ipnlocal/loglines_test.go +++ b/ipn/ipnlocal/loglines_test.go @@ -50,7 +50,7 @@ func TestLocalLogLines(t *testing.T) { sys := tsd.NewSystem() store := new(mem.Store) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatal(err) } diff --git a/ipn/ipnlocal/network-lock_test.go b/ipn/ipnlocal/network-lock_test.go index 93ecd977f..0d3f7db43 100644 --- a/ipn/ipnlocal/network-lock_test.go +++ b/ipn/ipnlocal/network-lock_test.go @@ -35,6 +35,7 @@ import ( "tailscale.com/types/netmap" "tailscale.com/types/persist" "tailscale.com/types/tkatype" + "tailscale.com/util/eventbus" "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" "tailscale.com/util/set" @@ -46,7 +47,7 @@ func (f observerFunc) SetControlClientStatus(_ controlclient.Client, s controlcl f(s) } -func fakeControlClient(t *testing.T, c *http.Client) *controlclient.Auto { +func fakeControlClient(t *testing.T, c *http.Client) (*controlclient.Auto, *eventbus.Bus) { hi := hostinfo.New() ni := tailcfg.NetInfo{LinkType: "wired"} hi.NetInfo = &ni @@ -70,7 +71,7 @@ func fakeControlClient(t *testing.T, c *http.Client) *controlclient.Auto { if err != nil { t.Fatal(err) } - return cc + return cc, bus } func fakeNoiseServer(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *http.Client) { @@ -158,8 +159,8 @@ func TestTKAEnablementFlow(t *testing.T) { defer ts.Close() temp := t.TempDir() - cc := fakeControlClient(t, client) - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + cc, bus := fakeControlClient(t, client) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(bus))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -199,7 +200,7 @@ func TestTKADisablementFlow(t *testing.T) { nlPriv := key.NewNLPrivate() key := tka.Key{Kind: tka.Key25519, Public: nlPriv.Public().Verifier(), Votes: 2} - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -267,7 +268,7 @@ func TestTKADisablementFlow(t *testing.T) { })) defer ts.Close() - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) b := LocalBackend{ varRoot: temp, cc: cc, @@ -391,7 +392,7 @@ func TestTKASync(t *testing.T) { t.Run(tc.name, func(t *testing.T) { nodePriv := key.NewNode() nlPriv := key.NewNLPrivate() - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -518,7 +519,7 @@ func TestTKASync(t *testing.T) { defer ts.Close() // Setup the client. - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) b := LocalBackend{ varRoot: temp, cc: cc, @@ -707,7 +708,7 @@ func TestTKADisable(t *testing.T) { disablementSecret := bytes.Repeat([]byte{0xa5}, 32) nlPriv := key.NewNLPrivate() - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -769,7 +770,7 @@ func TestTKADisable(t *testing.T) { })) defer ts.Close() - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) b := LocalBackend{ varRoot: temp, cc: cc, @@ -798,7 +799,7 @@ func TestTKASign(t *testing.T) { toSign := key.NewNode() nlPriv := key.NewNLPrivate() - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -860,7 +861,7 @@ func TestTKASign(t *testing.T) { } })) defer ts.Close() - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) b := LocalBackend{ varRoot: temp, cc: cc, @@ -887,7 +888,7 @@ func TestTKAForceDisable(t *testing.T) { nlPriv := key.NewNLPrivate() key := tka.Key{Kind: tka.Key25519, Public: nlPriv.Public().Verifier(), Votes: 2} - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -940,7 +941,7 @@ func TestTKAForceDisable(t *testing.T) { })) defer ts.Close() - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) sys := tsd.NewSystem() sys.Set(pm.Store()) @@ -985,7 +986,7 @@ func TestTKAAffectedSigs(t *testing.T) { // toSign := key.NewNode() nlPriv := key.NewNLPrivate() - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -1076,7 +1077,7 @@ func TestTKAAffectedSigs(t *testing.T) { } })) defer ts.Close() - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) b := LocalBackend{ varRoot: temp, cc: cc, @@ -1118,7 +1119,7 @@ func TestTKARecoverCompromisedKeyFlow(t *testing.T) { cosignPriv := key.NewNLPrivate() compromisedPriv := key.NewNLPrivate() - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, @@ -1188,7 +1189,7 @@ func TestTKARecoverCompromisedKeyFlow(t *testing.T) { } })) defer ts.Close() - cc := fakeControlClient(t, client) + cc, _ := fakeControlClient(t, client) b := LocalBackend{ varRoot: temp, cc: cc, @@ -1209,7 +1210,7 @@ func TestTKARecoverCompromisedKeyFlow(t *testing.T) { // Cosign using the cosigning key. { - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) must.Do(pm.SetPrefs((&ipn.Prefs{ Persist: &persist.Persist{ PrivateNodeKey: nodePriv, diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index 5654cf277..db01dd608 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -25,6 +25,7 @@ import ( "tailscale.com/tstest" "tailscale.com/types/logger" "tailscale.com/types/netmap" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" "tailscale.com/util/usermetric" "tailscale.com/wgengine" @@ -194,10 +195,9 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) { h.isSelf = false h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - sys := tsd.NewSystem() - t.Cleanup(sys.Bus.Get().Close) + sys := tsd.NewSystemWithBus(eventbustest.NewBus(t)) - ht := new(health.Tracker) + ht := health.NewTracker(sys.Bus.Get()) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) reg := new(usermetric.Registry) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) @@ -249,10 +249,9 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - sys := tsd.NewSystem() - t.Cleanup(sys.Bus.Get().Close) + sys := tsd.NewSystemWithBus(eventbustest.NewBus(t)) - ht := new(health.Tracker) + ht := health.NewTracker(sys.Bus.Get()) reg := new(usermetric.Registry) eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) @@ -323,11 +322,10 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - sys := tsd.NewSystem() - t.Cleanup(sys.Bus.Get().Close) + sys := tsd.NewSystemWithBus(eventbustest.NewBus(t)) rc := &appctest.RouteCollector{} - ht := new(health.Tracker) + ht := health.NewTracker(sys.Bus.Get()) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) reg := new(usermetric.Registry) @@ -392,10 +390,9 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { var h peerAPIHandler h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") - sys := tsd.NewSystem() - t.Cleanup(sys.Bus.Get().Close) + sys := tsd.NewSystemWithBus(eventbustest.NewBus(t)) - ht := new(health.Tracker) + ht := health.NewTracker(sys.Bus.Get()) reg := new(usermetric.Registry) rc := &appctest.RouteCollector{} eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set) diff --git a/ipn/ipnlocal/profiles.go b/ipn/ipnlocal/profiles.go index 1d312cfa6..6e1db4ff2 100644 --- a/ipn/ipnlocal/profiles.go +++ b/ipn/ipnlocal/profiles.go @@ -21,6 +21,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/types/logger" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" ) var debug = envknob.RegisterBool("TS_DEBUG_PROFILES") @@ -838,7 +839,9 @@ func (pm *profileManager) CurrentPrefs() ipn.PrefsView { // ReadStartupPrefsForTest reads the startup prefs from disk. It is only used for testing. func ReadStartupPrefsForTest(logf logger.Logf, store ipn.StateStore) (ipn.PrefsView, error) { - ht := new(health.Tracker) // in tests, don't care about the health status + bus := eventbus.New() + defer bus.Close() + ht := health.NewTracker(bus) // in tests, don't care about the health status pm, err := newProfileManager(store, logf, ht) if err != nil { return ipn.PrefsView{}, err diff --git a/ipn/ipnlocal/profiles_test.go b/ipn/ipnlocal/profiles_test.go index 52b095be1..8dce388bc 100644 --- a/ipn/ipnlocal/profiles_test.go +++ b/ipn/ipnlocal/profiles_test.go @@ -20,13 +20,14 @@ import ( "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/types/persist" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" ) func TestProfileCurrentUserSwitch(t *testing.T) { store := new(mem.Store) - pm, err := newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err := newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -63,7 +64,7 @@ func TestProfileCurrentUserSwitch(t *testing.T) { t.Fatalf("CurrentPrefs() = %v, want emptyPrefs", pm.CurrentPrefs().Pretty()) } - pm, err = newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err = newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -81,7 +82,7 @@ func TestProfileCurrentUserSwitch(t *testing.T) { func TestProfileList(t *testing.T) { store := new(mem.Store) - pm, err := newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err := newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -285,7 +286,7 @@ func TestProfileDupe(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { store := new(mem.Store) - pm, err := newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err := newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -318,7 +319,7 @@ func TestProfileDupe(t *testing.T) { func TestProfileManagement(t *testing.T) { store := new(mem.Store) - pm, err := newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err := newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -416,7 +417,7 @@ func TestProfileManagement(t *testing.T) { t.Logf("Recreate profile manager from store") // Recreate the profile manager to ensure that it can load the profiles // from the store at startup. - pm, err = newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err = newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -432,7 +433,7 @@ func TestProfileManagement(t *testing.T) { t.Logf("Recreate profile manager from store after deleting default profile") // Recreate the profile manager to ensure that it can load the profiles // from the store at startup. - pm, err = newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err = newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -474,7 +475,7 @@ func TestProfileManagement(t *testing.T) { t.Fatal("SetPrefs failed to save auto-update setting") } // Re-load profiles to trigger migration for invalid auto-update value. - pm, err = newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err = newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatal(err) } @@ -496,7 +497,7 @@ func TestProfileManagementWindows(t *testing.T) { store := new(mem.Store) - pm, err := newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "windows") + pm, err := newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "windows") if err != nil { t.Fatal(err) } @@ -565,7 +566,7 @@ func TestProfileManagementWindows(t *testing.T) { t.Logf("Recreate profile manager from store, should reset prefs") // Recreate the profile manager to ensure that it can load the profiles // from the store at startup. - pm, err = newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "windows") + pm, err = newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "windows") if err != nil { t.Fatal(err) } @@ -588,7 +589,7 @@ func TestProfileManagementWindows(t *testing.T) { } // Recreate the profile manager to ensure that it starts with test profile. - pm, err = newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "windows") + pm, err = newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "windows") if err != nil { t.Fatal(err) } @@ -1091,7 +1092,7 @@ func TestProfileStateChangeCallback(t *testing.T) { t.Parallel() store := new(mem.Store) - pm, err := newProfileManagerWithGOOS(store, logger.Discard, new(health.Tracker), "linux") + pm, err := newProfileManagerWithGOOS(store, logger.Discard, health.NewTracker(eventbustest.NewBus(t)), "linux") if err != nil { t.Fatalf("newProfileManagerWithGOOS: %v", err) } diff --git a/ipn/ipnlocal/serve_test.go b/ipn/ipnlocal/serve_test.go index d18ee4db9..a081ed27b 100644 --- a/ipn/ipnlocal/serve_test.go +++ b/ipn/ipnlocal/serve_test.go @@ -900,7 +900,7 @@ func newTestBackend(t *testing.T, opts ...any) *LocalBackend { e, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), EventBus: sys.Bus.Get(), }) @@ -918,7 +918,7 @@ func newTestBackend(t *testing.T, opts ...any) *LocalBackend { dir := t.TempDir() b.SetVarRoot(dir) - pm := must.Get(newProfileManager(new(mem.Store), logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), logf, health.NewTracker(bus))) pm.currentProfile = (&ipn.LoginProfile{ID: "id0"}).View() b.pm = pm diff --git a/ipn/ipnlocal/ssh_test.go b/ipn/ipnlocal/ssh_test.go index 6e93b34f0..b24cd6732 100644 --- a/ipn/ipnlocal/ssh_test.go +++ b/ipn/ipnlocal/ssh_test.go @@ -13,6 +13,7 @@ import ( "tailscale.com/health" "tailscale.com/ipn/store/mem" "tailscale.com/tailcfg" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" ) @@ -50,7 +51,7 @@ type fakeSSHServer struct { } func TestGetSSHUsernames(t *testing.T) { - pm := must.Get(newProfileManager(new(mem.Store), t.Logf, new(health.Tracker))) + pm := must.Get(newProfileManager(new(mem.Store), t.Logf, health.NewTracker(eventbustest.NewBus(t)))) b := &LocalBackend{pm: pm, store: pm.Store()} b.sshServer = fakeSSHServer{} res, err := b.getSSHUsernames(new(tailcfg.C2NSSHUsernamesRequest)) diff --git a/ipn/ipnlocal/state_test.go b/ipn/ipnlocal/state_test.go index 30538f2c8..ff21c920c 100644 --- a/ipn/ipnlocal/state_test.go +++ b/ipn/ipnlocal/state_test.go @@ -336,7 +336,7 @@ func TestStateMachine(t *testing.T) { sys := tsd.NewSystem() store := new(testStateStorage) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -974,7 +974,7 @@ func TestEditPrefsHasNoKeys(t *testing.T) { logf := tstest.WhileTestRunningLogger(t) sys := tsd.NewSystem() sys.Set(new(mem.Store)) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -1525,7 +1525,7 @@ func newLocalBackendWithMockEngineAndControl(t *testing.T, enableLogging bool) ( EventBus: sys.Bus.Get(), NetMon: dialer.NetMon(), Metrics: sys.UserMetricsRegistry(), - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), DisablePortMapper: true, }) if err != nil { diff --git a/ipn/lapitest/backend.go b/ipn/lapitest/backend.go index ddf48fb28..6a83431f3 100644 --- a/ipn/lapitest/backend.go +++ b/ipn/lapitest/backend.go @@ -33,7 +33,7 @@ func newBackend(opts *options) *ipnlocal.LocalBackend { sys.Set(&mem.Store{}) } - e, err := wgengine.NewFakeUserspaceEngine(opts.Logf(), sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + e, err := wgengine.NewFakeUserspaceEngine(opts.Logf(), sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { opts.tb.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/ipn/localapi/localapi_test.go b/ipn/localapi/localapi_test.go index 046eb744d..fa24717f7 100644 --- a/ipn/localapi/localapi_test.go +++ b/ipn/localapi/localapi_test.go @@ -339,7 +339,7 @@ func newTestLocalBackend(t testing.TB) *ipnlocal.LocalBackend { sys := tsd.NewSystemWithBus(eventbustest.NewBus(t)) store := new(mem.Store) sys.Set(store) - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/net/dns/manager_tcp_test.go b/net/dns/manager_tcp_test.go index f4c42791e..46883a1e7 100644 --- a/net/dns/manager_tcp_test.go +++ b/net/dns/manager_tcp_test.go @@ -20,6 +20,7 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/tstest" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus/eventbustest" ) func mkDNSRequest(domain dnsname.FQDN, tp dns.Type, modify func(*dns.Builder)) []byte { @@ -89,7 +90,7 @@ func TestDNSOverTCP(t *testing.T) { SearchDomains: fqdns("coffee.shop"), }, } - m := NewManager(t.Logf, &f, new(health.Tracker), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "") + m := NewManager(t.Logf, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "") m.resolver.TestOnlySetHook(f.SetResolver) m.Set(Config{ Hosts: hosts( @@ -174,7 +175,7 @@ func TestDNSOverTCP_TooLarge(t *testing.T) { SearchDomains: fqdns("coffee.shop"), }, } - m := NewManager(log, &f, new(health.Tracker), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "") + m := NewManager(log, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "") m.resolver.TestOnlySetHook(f.SetResolver) m.Set(Config{ Hosts: hosts("andrew.ts.com.", "1.2.3.4"), diff --git a/net/dns/manager_test.go b/net/dns/manager_test.go index 522f9636a..b5a510862 100644 --- a/net/dns/manager_test.go +++ b/net/dns/manager_test.go @@ -19,6 +19,7 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/types/dnstype" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus/eventbustest" ) type fakeOSConfigurator struct { @@ -932,7 +933,7 @@ func TestManager(t *testing.T) { goos = "linux" } knobs := &controlknobs.Knobs{} - m := NewManager(t.Logf, &f, new(health.Tracker), tsdial.NewDialer(netmon.NewStatic()), nil, knobs, goos) + m := NewManager(t.Logf, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, knobs, goos) m.resolver.TestOnlySetHook(f.SetResolver) if err := m.Set(test.in); err != nil { @@ -1038,7 +1039,7 @@ func TestConfigRecompilation(t *testing.T) { SearchDomains: fqdns("foo.ts.net"), } - m := NewManager(t.Logf, f, new(health.Tracker), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "darwin") + m := NewManager(t.Logf, f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "darwin") var managerConfig *resolver.Config m.resolver.TestOnlySetHook(func(cfg resolver.Config) { diff --git a/net/dns/resolver/forwarder_test.go b/net/dns/resolver/forwarder_test.go index f7cda15f6..f77388ca7 100644 --- a/net/dns/resolver/forwarder_test.go +++ b/net/dns/resolver/forwarder_test.go @@ -29,7 +29,7 @@ import ( "tailscale.com/net/tsdial" "tailscale.com/tstest" "tailscale.com/types/dnstype" - "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" ) func (rr resolverAndDelay) String() string { @@ -455,8 +455,7 @@ func makeLargeResponse(tb testing.TB, domain string) (request, response []byte) func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports ...uint16) ([]byte, error) { logf := tstest.WhileTestRunningLogger(tb) - bus := eventbus.New() - defer bus.Close() + bus := eventbustest.NewBus(tb) netMon, err := netmon.New(bus, logf) if err != nil { tb.Fatal(err) @@ -465,7 +464,7 @@ func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports var dialer tsdial.Dialer dialer.SetNetMon(netMon) - fwd := newForwarder(logf, netMon, nil, &dialer, new(health.Tracker), nil) + fwd := newForwarder(logf, netMon, nil, &dialer, health.NewTracker(bus), nil) if modify != nil { modify(fwd) } diff --git a/net/dns/resolver/tsdns_test.go b/net/dns/resolver/tsdns_test.go index 4bbfd4d6a..0823ea139 100644 --- a/net/dns/resolver/tsdns_test.go +++ b/net/dns/resolver/tsdns_test.go @@ -31,7 +31,7 @@ import ( "tailscale.com/types/dnstype" "tailscale.com/types/logger" "tailscale.com/util/dnsname" - "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" ) var ( @@ -356,7 +356,7 @@ func newResolver(t testing.TB) *Resolver { return New(t.Logf, nil, // no link selector tsdial.NewDialer(netmon.NewStatic()), - new(health.Tracker), + health.NewTracker(eventbustest.NewBus(t)), nil, // no control knobs ) } @@ -1060,8 +1060,7 @@ func TestForwardLinkSelection(t *testing.T) { // routes differently. specialIP := netaddr.IPv4(1, 2, 3, 4) - bus := eventbus.New() - defer bus.Close() + bus := eventbustest.NewBus(t) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, ".... netmon: ")) if err != nil { @@ -1074,7 +1073,7 @@ func TestForwardLinkSelection(t *testing.T) { return "special" } return "" - }), new(tsdial.Dialer), new(health.Tracker), nil /* no control knobs */) + }), new(tsdial.Dialer), health.NewTracker(bus), nil /* no control knobs */) // Test non-special IP. if got, err := fwd.packetListener(netip.Addr{}); err != nil { diff --git a/net/tlsdial/tlsdial_test.go b/net/tlsdial/tlsdial_test.go index e2c4cdd4f..a288d7653 100644 --- a/net/tlsdial/tlsdial_test.go +++ b/net/tlsdial/tlsdial_test.go @@ -16,6 +16,7 @@ import ( "tailscale.com/health" "tailscale.com/net/bakedroots" + "tailscale.com/util/eventbus/eventbustest" ) func TestFallbackRootWorks(t *testing.T) { @@ -85,7 +86,7 @@ func TestFallbackRootWorks(t *testing.T) { }, DisableKeepAlives: true, // for test cleanup ease } - ht := new(health.Tracker) + ht := health.NewTracker(eventbustest.NewBus(t)) tr.TLSClientConfig = Config(ht, tr.TLSClientConfig) c := &http.Client{Transport: tr} diff --git a/ssh/tailssh/tailssh_test.go b/ssh/tailssh/tailssh_test.go index 96fb87f49..44b2d68df 100644 --- a/ssh/tailssh/tailssh_test.go +++ b/ssh/tailssh/tailssh_test.go @@ -1062,7 +1062,7 @@ func TestSSHAuthFlow(t *testing.T) { func TestSSH(t *testing.T) { logf := tstest.WhileTestRunningLogger(t) sys := tsd.NewSystem() - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker.Get(), sys.UserMetricsRegistry(), sys.Bus.Get()) if err != nil { t.Fatal(err) } diff --git a/tsd/tsd.go b/tsd/tsd.go index e4a512e4b..263b8de70 100644 --- a/tsd/tsd.go +++ b/tsd/tsd.go @@ -60,6 +60,7 @@ type System struct { DriveForLocal SubSystem[drive.FileSystemForLocal] DriveForRemote SubSystem[drive.FileSystemForRemote] PolicyClient SubSystem[policyclient.Client] + HealthTracker SubSystem[*health.Tracker] // InitialConfig is initial server config, if any. // It is nil if the node is not in declarative mode. @@ -74,7 +75,6 @@ type System struct { controlKnobs controlknobs.Knobs proxyMap proxymap.Mapper - healthTracker health.Tracker userMetricsRegistry usermetric.Registry } @@ -91,6 +91,10 @@ func NewSystemWithBus(bus *eventbus.Bus) *System { } sys := new(System) sys.Set(bus) + + tracker := health.NewTracker(bus) + sys.Set(tracker) + return sys } @@ -138,6 +142,8 @@ func (s *System) Set(v any) { s.DriveForRemote.Set(v) case policyclient.Client: s.PolicyClient.Set(v) + case *health.Tracker: + s.HealthTracker.Set(v) default: panic(fmt.Sprintf("unknown type %T", v)) } @@ -167,11 +173,6 @@ func (s *System) ProxyMapper() *proxymap.Mapper { return &s.proxyMap } -// HealthTracker returns the system health tracker. -func (s *System) HealthTracker() *health.Tracker { - return &s.healthTracker -} - // UserMetricsRegistry returns the system usermetrics. func (s *System) UserMetricsRegistry() *usermetric.Registry { return &s.userMetricsRegistry diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index d25da0996..d9b9b64c1 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -577,7 +577,7 @@ func (s *Server) start() (reterr error) { sys := tsd.NewSystem() s.sys = sys - if err := s.startLogger(&closePool, sys.HealthTracker(), tsLogf); err != nil { + if err := s.startLogger(&closePool, sys.HealthTracker.Get(), tsLogf); err != nil { return err } @@ -595,7 +595,7 @@ func (s *Server) start() (reterr error) { Dialer: s.dialer, SetSubsystem: sys.Set, ControlKnobs: sys.ControlKnobs(), - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), }) if err != nil { @@ -603,7 +603,7 @@ func (s *Server) start() (reterr error) { } closePool.add(s.dialer) sys.Set(eng) - sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry()) + sys.HealthTracker.Get().SetMetricsRegistry(sys.UserMetricsRegistry()) // TODO(oxtoacart): do we need to support Taildrive on tsnet, and if so, how? ns, err := netstack.Create(tsLogf, sys.Tun.Get(), eng, sys.MagicSock.Get(), s.dialer, sys.DNSManager.Get(), sys.ProxyMapper()) diff --git a/util/eventbus/eventbustest/eventbustest.go b/util/eventbus/eventbustest/eventbustest.go index b7375adc4..af725ace1 100644 --- a/util/eventbus/eventbustest/eventbustest.go +++ b/util/eventbus/eventbustest/eventbustest.go @@ -100,7 +100,7 @@ func Expect(tw *Watcher, filters ...any) error { case <-time.After(tw.TimeOut): return fmt.Errorf( "timed out waiting for event, saw %d events, %d was expected", - eventCount, head) + eventCount, len(filters)) case <-tw.chDone: return errors.New("watcher closed while waiting for events") } @@ -138,7 +138,7 @@ func ExpectExactly(tw *Watcher, filters ...any) error { case <-time.After(tw.TimeOut): return fmt.Errorf( "timed out waiting for event, saw %d events, %d was expected", - eventCount, pos) + eventCount, len(filters)) case <-tw.chDone: return errors.New("watcher closed while waiting for events") } diff --git a/wgengine/bench/wg.go b/wgengine/bench/wg.go index 9b195bdb7..4de7677f2 100644 --- a/wgengine/bench/wg.go +++ b/wgengine/bench/wg.go @@ -53,7 +53,7 @@ func setupWGTest(b *testing.B, logf logger.Logf, traf *TrafficGen, a1, a2 netip. ListenPort: 0, Tun: t1, SetSubsystem: s1.Set, - HealthTracker: s1.HealthTracker(), + HealthTracker: s1.HealthTracker.Get(), }) if err != nil { log.Fatalf("e1 init: %v", err) @@ -80,7 +80,7 @@ func setupWGTest(b *testing.B, logf logger.Logf, traf *TrafficGen, a1, a2 netip. ListenPort: 0, Tun: t2, SetSubsystem: s2.Set, - HealthTracker: s2.HealthTracker(), + HealthTracker: s2.HealthTracker.Get(), }) if err != nil { log.Fatalf("e2 init: %v", err) diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index bb5922c8c..1b885c3f1 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -67,6 +67,7 @@ import ( "tailscale.com/util/cibuild" "tailscale.com/util/clientmetric" "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" "tailscale.com/util/racebuild" "tailscale.com/util/set" @@ -179,14 +180,13 @@ func newMagicStack(t testing.TB, logf logger.Logf, l nettype.PacketListener, der func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListener, derpMap *tailcfg.DERPMap, privateKey key.NodePrivate) *magicStack { t.Helper() - bus := eventbus.New() - t.Cleanup(bus.Close) + bus := eventbustest.NewBus(t) netMon, err := netmon.New(bus, logf) if err != nil { t.Fatalf("netmon.New: %v", err) } - ht := new(health.Tracker) + ht := health.NewTracker(bus) var reg usermetric.Registry epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary @@ -1352,8 +1352,7 @@ func newTestConn(t testing.TB) *Conn { t.Helper() port := pickPort(t) - bus := eventbus.New() - t.Cleanup(bus.Close) + bus := eventbustest.NewBus(t) netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: ")) if err != nil { @@ -1364,7 +1363,7 @@ func newTestConn(t testing.TB) *Conn { conn, err := NewConn(Options{ NetMon: netMon, EventBus: bus, - HealthTracker: new(health.Tracker), + HealthTracker: health.NewTracker(bus), Metrics: new(usermetric.Registry), DisablePortMapper: true, Logf: t.Logf, @@ -3038,7 +3037,7 @@ func TestMaybeSetNearestDERP(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - ht := new(health.Tracker) + ht := health.NewTracker(eventbustest.NewBus(t)) c := newConn(t.Logf) c.myDerp = tt.old c.derpMap = derpMap diff --git a/wgengine/netstack/netstack_test.go b/wgengine/netstack/netstack_test.go index 584b3babc..93022811c 100644 --- a/wgengine/netstack/netstack_test.go +++ b/wgengine/netstack/netstack_test.go @@ -50,7 +50,7 @@ func TestInjectInboundLeak(t *testing.T) { Tun: tunDev, Dialer: dialer, SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), EventBus: sys.Bus.Get(), }) @@ -110,7 +110,7 @@ func makeNetstack(tb testing.TB, config func(*Impl)) *Impl { Tun: tunDev, Dialer: dialer, SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), EventBus: sys.Bus.Get(), }) diff --git a/wgengine/router/router_linux_test.go b/wgengine/router/router_linux_test.go index b6a5a1ac0..3b1eb7db6 100644 --- a/wgengine/router/router_linux_test.go +++ b/wgengine/router/router_linux_test.go @@ -375,7 +375,7 @@ ip route add throw 192.168.0.0/24 table 52` + basic, defer mon.Close() fake := NewFakeOS(t) - ht := new(health.Tracker) + ht := health.NewTracker(bus) router, err := newUserspaceRouterAdvanced(t.Logf, "tailscale0", mon, fake, ht, bus) router.(*linuxRouter).nfr = fake.nfr if err != nil { diff --git a/wgengine/userspace_ext_test.go b/wgengine/userspace_ext_test.go index 5e7d1ce6a..8e7bbb7a9 100644 --- a/wgengine/userspace_ext_test.go +++ b/wgengine/userspace_ext_test.go @@ -21,7 +21,7 @@ func TestIsNetstack(t *testing.T) { tstest.WhileTestRunningLogger(t), wgengine.Config{ SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + HealthTracker: sys.HealthTracker.Get(), Metrics: sys.UserMetricsRegistry(), EventBus: sys.Bus.Get(), }, @@ -73,7 +73,7 @@ func TestIsNetstackRouter(t *testing.T) { } conf := tt.conf conf.SetSubsystem = sys.Set - conf.HealthTracker = sys.HealthTracker() + conf.HealthTracker = sys.HealthTracker.Get() conf.Metrics = sys.UserMetricsRegistry() conf.EventBus = sys.Bus.Get() e, err := wgengine.NewUserspaceEngine(logger.Discard, conf) diff --git a/wgengine/userspace_test.go b/wgengine/userspace_test.go index 87a36c673..89d75b98a 100644 --- a/wgengine/userspace_test.go +++ b/wgengine/userspace_test.go @@ -25,7 +25,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/netmap" "tailscale.com/types/opt" - "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/usermetric" "tailscale.com/wgengine/router" "tailscale.com/wgengine/wgcfg" @@ -101,10 +101,9 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView { } func TestUserspaceEngineReconfig(t *testing.T) { - bus := eventbus.New() - defer bus.Close() + bus := eventbustest.NewBus(t) - ht := new(health.Tracker) + ht := health.NewTracker(bus) reg := new(usermetric.Registry) e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus) if err != nil { @@ -170,12 +169,11 @@ func TestUserspaceEnginePortReconfig(t *testing.T) { var knobs controlknobs.Knobs - bus := eventbus.New() - defer bus.Close() + bus := eventbustest.NewBus(t) // Keep making a wgengine until we find an unused port var ue *userspaceEngine - ht := new(health.Tracker) + ht := health.NewTracker(bus) reg := new(usermetric.Registry) for i := range 100 { attempt := uint16(defaultPort + i) @@ -258,9 +256,8 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) { var knobs controlknobs.Knobs - bus := eventbus.New() - defer bus.Close() - ht := new(health.Tracker) + bus := eventbustest.NewBus(t) + ht := health.NewTracker(bus) reg := new(usermetric.Registry) e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg, bus) if err != nil { diff --git a/wgengine/watchdog_test.go b/wgengine/watchdog_test.go index a54a0d3fa..35fd8f331 100644 --- a/wgengine/watchdog_test.go +++ b/wgengine/watchdog_test.go @@ -9,7 +9,7 @@ import ( "time" "tailscale.com/health" - "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/usermetric" ) @@ -25,9 +25,8 @@ func TestWatchdog(t *testing.T) { t.Run("default watchdog does not fire", func(t *testing.T) { t.Parallel() - bus := eventbus.New() - defer bus.Close() - ht := new(health.Tracker) + bus := eventbustest.NewBus(t) + ht := health.NewTracker(bus) reg := new(usermetric.Registry) e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg, bus) if err != nil {