From bedef0e4b77472b320ed615de528f3cc44438816 Mon Sep 17 00:00:00 2001 From: James Tucker Date: Fri, 6 Feb 2026 01:06:41 -0800 Subject: [PATCH] cmd/tailscale: add event bus queue depth debugging Under extremely high load it appears we may have some retention issues as a result of queue depth build up, but there is currently no direct way to observe this. The scenario does not trigger the slow subscriber log message, and the event stream debugging endpoint produces a saturating volume of information. Updates tailscale/corp#36904 Signed-off-by: James Tucker --- client/local/local.go | 5 ++++ cmd/tailscale/cli/debug.go | 15 ++++++++++ ipn/localapi/debug.go | 58 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/client/local/local.go b/client/local/local.go index 195a91b1e..139f1a57d 100644 --- a/client/local/local.go +++ b/client/local/local.go @@ -446,6 +446,11 @@ func (lc *Client) EventBusGraph(ctx context.Context) ([]byte, error) { return lc.get200(ctx, "/localapi/v0/debug-bus-graph") } +// EventBusQueues returns a JSON snapshot of event bus queue depths per client. +func (lc *Client) EventBusQueues(ctx context.Context) ([]byte, error) { + return lc.get200(ctx, "/localapi/v0/debug-bus-queues") +} + // StreamBusEvents returns an iterator of Tailscale bus events as they arrive. // Each pair is a valid event and a nil error, or a zero event a non-nil error. // In case of error, the iterator ends after the pair reporting the error. diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index ccbfb59de..a7d81be29 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -124,6 +124,12 @@ func debugCmd() *ffcli.Command { return fs })(), }, + { + Name: "daemon-bus-queues", + ShortUsage: "tailscale debug daemon-bus-queues", + Exec: runDaemonBusQueues, + ShortHelp: "Print event bus queue depths per client", + }, { Name: "metrics", ShortUsage: "tailscale debug metrics", @@ -840,6 +846,15 @@ func runDaemonBusGraph(ctx context.Context, args []string) error { return nil } +func runDaemonBusQueues(ctx context.Context, args []string) error { + data, err := localClient.EventBusQueues(ctx) + if err != nil { + return err + } + fmt.Print(string(data)) + return nil +} + // generateDOTGraph generates the DOT graph format based on the events func generateDOTGraph(topics []eventbus.DebugTopic) string { var sb strings.Builder diff --git a/ipn/localapi/debug.go b/ipn/localapi/debug.go index ae9cb01e0..5f9032bc0 100644 --- a/ipn/localapi/debug.go +++ b/ipn/localapi/debug.go @@ -6,6 +6,7 @@ package localapi import ( + "cmp" "context" "encoding/json" "fmt" @@ -35,6 +36,7 @@ func init() { Register("dev-set-state-store", (*Handler).serveDevSetStateStore) Register("debug-bus-events", (*Handler).serveDebugBusEvents) Register("debug-bus-graph", (*Handler).serveEventBusGraph) + Register("debug-bus-queues", (*Handler).serveDebugBusQueues) Register("debug-derp-region", (*Handler).serveDebugDERPRegion) Register("debug-dial-types", (*Handler).serveDebugDialTypes) Register("debug-log", (*Handler).serveDebugLog) @@ -424,6 +426,62 @@ func (h *Handler) serveEventBusGraph(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(topics) } +func (h *Handler) serveDebugBusQueues(w http.ResponseWriter, r *http.Request) { + if r.Method != httpm.GET { + http.Error(w, "GET required", http.StatusMethodNotAllowed) + return + } + + bus, ok := h.LocalBackend().Sys().Bus.GetOK() + if !ok { + http.Error(w, "event bus not running", http.StatusPreconditionFailed) + return + } + + debugger := bus.Debugger() + + type clientQueue struct { + Name string `json:"name"` + SubscribeDepth int `json:"subscribeDepth"` + SubscribeTypes []string `json:"subscribeTypes,omitempty"` + PublishTypes []string `json:"publishTypes,omitempty"` + } + + publishQueue := debugger.PublishQueue() + clients := debugger.Clients() + result := struct { + PublishQueueDepth int `json:"publishQueueDepth"` + Clients []clientQueue `json:"clients"` + }{ + PublishQueueDepth: len(publishQueue), + } + + for _, c := range clients { + sq := debugger.SubscribeQueue(c) + cq := clientQueue{ + Name: c.Name(), + SubscribeDepth: len(sq), + } + for _, t := range debugger.SubscribeTypes(c) { + cq.SubscribeTypes = append(cq.SubscribeTypes, t.String()) + } + for _, t := range debugger.PublishTypes(c) { + cq.PublishTypes = append(cq.PublishTypes, t.String()) + } + result.Clients = append(result.Clients, cq) + } + + slices.SortFunc(result.Clients, func(a, b clientQueue) int { + if a.SubscribeDepth != b.SubscribeDepth { + return b.SubscribeDepth - a.SubscribeDepth + } + return cmp.Compare(a.Name, b.Name) + }) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) +} + func (h *Handler) serveDebugLog(w http.ResponseWriter, r *http.Request) { if !buildfeatures.HasLogTail { http.Error(w, feature.ErrUnavailable.Error(), http.StatusNotImplemented)