From 01d0bdd25366a095ec5a07e14baaef438d309f2b Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 29 Apr 2026 10:29:09 -0700 Subject: [PATCH] cmd/derper,derp: add metrics for rate limit hits (#19560) Expvars track count of rate limiters exceeding their threshold. Covers (1) global rate limiter and (2) total of local rate limiters. Also publish optional rate-limit metrics during ExpVar() call if -rate-config is specified. Fixes current rate-limit metrics being published outside of "derp" in /debug/vars. Updates tailscale/corp#38509 Change-Id: Ic7f5a1e890d0d7d3d7b679daa4b5f8926a6a6964 Signed-off-by: Alex Valiushko --- cmd/derper/derper.go | 2 +- derp/derp_test.go | 2 +- derp/derpserver/derpserver.go | 99 +++++++++++++++++++++--------- derp/derpserver/derpserver_test.go | 91 +++++++++++++++++++++++++++ 4 files changed, 162 insertions(+), 32 deletions(-) diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go index b4178c8d0..745d887f8 100644 --- a/cmd/derper/derper.go +++ b/cmd/derper/derper.go @@ -252,7 +252,7 @@ func main() { if err := startMesh(s); err != nil { log.Fatalf("startMesh: %v", err) } - expvar.Publish("derp", s.ExpVar()) + expvar.Publish("derp", s.ExpVar(*rateConfigPath != "")) handleHome, ok := getHomeHandler(*flagHome) if !ok { diff --git a/derp/derp_test.go b/derp/derp_test.go index 24d509944..0edbaff17 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -353,7 +353,7 @@ func TestSendRecv(t *testing.T) { } } - serverMetrics := s.ExpVar().(*metrics.Set) + serverMetrics := s.ExpVar(false).(*metrics.Set) wantActive := func(total, home int64) { t.Helper() diff --git a/derp/derpserver/derpserver.go b/derp/derpserver/derpserver.go index 4e27de84a..7c8f0bd6b 100644 --- a/derp/derpserver/derpserver.go +++ b/derp/derpserver/derpserver.go @@ -172,6 +172,9 @@ type Server struct { meshUpdateBatchSize *metrics.Histogram meshUpdateLoopCount *metrics.Histogram bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush + rateLimitGlobalWaited expvar.Int // number of times global rate limit caused a wait + rateLimitPerClientWaited expvar.Int // number of times per-client rate limit caused a wait + // TODO(illotum): add metrics for rate limited wait time, consider total seconds vs a histogram. // verifyClientsLocalTailscaled only accepts client connections to the DERP // server if the clientKey is a known peer in the network, as specified by a @@ -566,37 +569,11 @@ func (s *Server) LoadAndApplyRateConfig(path string) error { return nil } -var publishRateLimitsMetricsOnce sync.Once - -func (s *Server) publishRateLimitsMetrics() { - // Rate limiting is currently experimental, its APIs are unstable, and it must - // be opted-in via --rate-config. Therefore, we only publish related metrics - // on demand, to avoid polluting uninterested metrics consumers. - // - // Note: The [sync.Once] is package-level, and the [expvar.Var] closures - // capture [Server], so first [Server] owns these for process lifetime. - publishRateLimitsMetricsOnce.Do(func() { - expvar.Publish("derp_per_client_rate_limit_bytes_per_second", s.expVarFunc(func() any { - return s.rateConfig.PerClientRateLimitBytesPerSec - })) - expvar.Publish("derp_per_client_rate_burst_bytes", s.expVarFunc(func() any { - return s.rateConfig.PerClientRateBurstBytes - })) - expvar.Publish("derp_global_rate_limit_bytes_per_second", s.expVarFunc(func() any { - return s.rateConfig.GlobalRateLimitBytesPerSec - })) - expvar.Publish("derp_global_rate_burst_bytes", s.expVarFunc(func() any { - return s.rateConfig.GlobalRateBurstBytes - })) - }) -} - // UpdateRateLimits sets the receive rate limits, updating all existing client // connections. It returns the applied config, which may differ from rc. If the // per-client rate limit is 0, rate limiting is disabled. Mesh peers are always // exempt from rate limiting. func (s *Server) UpdateRateLimits(rc RateConfig) (applied RateConfig) { - s.publishRateLimitsMetrics() s.mu.Lock() defer s.mu.Unlock() if rc.PerClientRateLimitBytesPerSec == 0 { @@ -1374,7 +1351,33 @@ func (c *sclient) setRateLimit(bytesPerSec, burst uint64, parent *xrate.Limiter) c.recvLim.Store(lim) } -// rateLimit applies the per-client receive rate limit. +// rateLimitWait is a reimplementation of [xrate.Limiter.WaitN] via [xrate.Limiter.ReserveN]. +// It returns the duration waited for tokens to become available. +func rateLimitWait(ctx context.Context, lim *xrate.Limiter, n int, now time.Time, newTimer func(time.Duration) (<-chan time.Time, func() bool)) (time.Duration, error) { + r := lim.ReserveN(now, n) + if !r.OK() { + return 0, fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.Burst()) + } + delay := r.DelayFrom(now) + if delay == 0 { + return 0, nil + } + ch, stop := newTimer(delay) + defer stop() + select { + case <-ch: + // Note: We return the predicted delay as wall-clock duration. May be not the same. + return delay, nil + case <-ctx.Done(): + r.Cancel() + return 0, ctx.Err() + } +} + +// rateLimit applies the receive rate limit. +// Per-client rate limiting is applied before global. +// The former lets us differentiate classes of service, +// the latter sets the overall pace of reading. // By limiting here we prevent reading from the buffered reader // [sclient.br] if the limit has been exceeded. Any reads done here provide space // within the buffered reader to fill back in with data from @@ -1384,6 +1387,10 @@ func (c *sclient) setRateLimit(bytesPerSec, burst uint64, parent *xrate.Limiter) // and this is a no-op. func (c *sclient) rateLimit(n int) error { if lim := c.recvLim.Load(); lim != nil { + newTimer := func(d time.Duration) (<-chan time.Time, func() bool) { + tc, ch := c.s.clock.NewTimer(d) + return ch, tc.Stop + } // If n exceeds the capacity of the bucket, then WaitN will return // an error and consume zero tokens. To prevent this, clamp n to // [minRateLimitTokenBucketSize]. @@ -1394,12 +1401,25 @@ func (c *sclient) rateLimit(n int) error { // 2. is only partially read off the socket (bufio) // 3. would cause the connection to close shortly after rate limiting, anyway. clampedN := min(n, minRateLimitTokenBucketSize) - err := lim.child.WaitN(c.ctx, clampedN) + now := c.s.clock.Now() + durationWaited, err := rateLimitWait(c.ctx, lim.child, clampedN, now, newTimer) if err != nil { return err } + if durationWaited > 0 { + c.s.rateLimitPerClientWaited.Add(1) + } if lim.parent != nil { - return lim.parent.WaitN(c.ctx, clampedN) + if durationWaited > 0 { + now = c.s.clock.Now() // update 'now' if we already waited + } + durationWaited, err = rateLimitWait(c.ctx, lim.parent, clampedN, now, newTimer) + if err != nil { + return err + } + if durationWaited > 0 { + c.s.rateLimitGlobalWaited.Add(1) + } } } return nil @@ -2393,7 +2413,7 @@ func (s *Server) expVarFunc(f func() any) expvar.Func { } // ExpVar returns an expvar variable suitable for registering with expvar.Publish. -func (s *Server) ExpVar() expvar.Var { +func (s *Server) ExpVar(rateLimitEnabled bool) expvar.Var { m := new(metrics.Set) m.Set("gauge_memstats_sys0", expvar.Func(func() any { return int64(s.memSys0) })) m.Set("gauge_watchers", s.expVarFunc(func() any { return len(s.watchers) })) @@ -2436,6 +2456,25 @@ func (s *Server) ExpVar() expvar.Var { var expvarVersion expvar.String expvarVersion.Set(version.Long()) m.Set("version", &expvarVersion) + if rateLimitEnabled { + // Rate limiting is currently experimental, its APIs are unstable, and it must + // be opted-in via --rate-config. Therefore, we only publish related metrics + // on demand, to avoid polluting uninterested metrics consumers. + m.Set("rate_limit_per_client_bytes_per_second", s.expVarFunc(func() any { + return s.rateConfig.PerClientRateLimitBytesPerSec + })) + m.Set("rate_limit_per_client_burst_bytes", s.expVarFunc(func() any { + return s.rateConfig.PerClientRateBurstBytes + })) + m.Set("rate_limit_global_bytes_per_second", s.expVarFunc(func() any { + return s.rateConfig.GlobalRateLimitBytesPerSec + })) + m.Set("rate_limit_global_burst_bytes", s.expVarFunc(func() any { + return s.rateConfig.GlobalRateBurstBytes + })) + m.Set("rate_limit_per_client_waited", &s.rateLimitPerClientWaited) + m.Set("rate_limit_global_waited", &s.rateLimitGlobalWaited) + } return m } diff --git a/derp/derpserver/derpserver_test.go b/derp/derpserver/derpserver_test.go index 9bd631f3b..a5c0e025d 100644 --- a/derp/derpserver/derpserver_test.go +++ b/derp/derpserver/derpserver_test.go @@ -962,8 +962,12 @@ func TestPerClientRateLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) + s := New(key.NewNode(), logger.Discard) + defer s.Close() + c := &sclient{ ctx: ctx, + s: s, } lim := &parentChildTokenBuckets{ // Set parent limit to half of child to enable verification of @@ -1029,6 +1033,15 @@ func TestPerClientRateLimit(t *testing.T) { } wantTokens(t, 0, minRateLimitTokenBucketSize) + + // The second rateLimit call had to wait for both child and parent + // buckets, so both counters should be 1. + if got := s.rateLimitPerClientWaited.Value(); got != 1 { + t.Fatalf("rateLimitPerClientWaited = %d, want 1", got) + } + if got := s.rateLimitGlobalWaited.Value(); got != 1 { + t.Fatalf("rateLimitGlobalWaited = %d, want 1", got) + } }) }) @@ -1036,8 +1049,12 @@ func TestPerClientRateLimit(t *testing.T) { synctest.Test(t, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + s := New(key.NewNode(), logger.Discard) + defer s.Close() + c := &sclient{ ctx: ctx, + s: s, } lim := &parentChildTokenBuckets{ child: rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize), @@ -1095,6 +1112,80 @@ func TestPerClientRateLimit(t *testing.T) { }) } +// zeroTimer returns a timer that fires immediately. +func zeroTimer(_ time.Duration) (<-chan time.Time, func() bool) { + t := time.NewTimer(0) + return t.C, t.Stop +} + +// neverTimer returns a timer that never fires. +func neverTimer(_ time.Duration) (<-chan time.Time, func() bool) { + return make(chan time.Time), func() bool { return false } +} + +func TestRateLimitWait(t *testing.T) { + ctx := context.Background() + + t.Run("no_wait", func(t *testing.T) { + lim := rate.NewLimiter(10, 10) + waited, err := rateLimitWait(ctx, lim, 5, time.Now(), zeroTimer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if waited != 0 { + t.Fatalf("waited = %v, want 0", waited) + } + }) + + t.Run("wait_for_tokens", func(t *testing.T) { + lim := rate.NewLimiter(10, 10) + now := time.Now() + waited, err := rateLimitWait(ctx, lim, 10, now, zeroTimer) // exhaust all tokens + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if waited != 0 { + t.Fatalf("waited = %v, want 0", waited) + } + waited, err = rateLimitWait(ctx, lim, 10, now, zeroTimer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if waited == 0 { + t.Fatal("waited = 0, want > 0") + } + }) + + t.Run("context_canceled", func(t *testing.T) { + lim := rate.NewLimiter(10, 10) + now := time.Now() + _, err := rateLimitWait(ctx, lim, 10, now, zeroTimer) // exhaust all tokens + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + canceled, cancel := context.WithCancel(ctx) // cancel context so the select picks ctx.Done() + cancel() + waited, err := rateLimitWait(canceled, lim, 10, now, neverTimer) // neverTimer to only unblock via context + if err == nil { + t.Fatal("expected error from canceled context") + } + if waited != 0 { + t.Fatalf("waited = %v, want 0", waited) + } + }) + + t.Run("n_exceeds_burst", func(t *testing.T) { + lim := rate.NewLimiter(10, 5) + waited, err := rateLimitWait(ctx, lim, 10, time.Now(), zeroTimer) + if err == nil { + t.Fatal("expected error when n > burst") + } + if waited != 0 { + t.Fatalf("waited = %v, want 0", waited) + } + }) +} + func verifyLimiter(t *testing.T, lim *parentChildTokenBuckets, wantRateConfig RateConfig) { t.Helper() if got := lim.child.Limit(); got != rate.Limit(wantRateConfig.PerClientRateLimitBytesPerSec) {