cmd/derper,derp: add metrics for rate limit hits (#19560)

Expvars track count of rate limiters exceeding their threshold.
Covers (1) global rate limiter and (2) total of local rate limiters.

Also publish optional rate-limit metrics during ExpVar() call
if -rate-config is specified. Fixes current rate-limit metrics
being published outside of "derp" in /debug/vars.

Updates tailscale/corp#38509

Change-Id: Ic7f5a1e890d0d7d3d7b679daa4b5f8926a6a6964
Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com>
This commit is contained in:
Alex Valiushko 2026-04-29 10:29:09 -07:00 committed by GitHub
parent be7cce74ba
commit 01d0bdd253
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 162 additions and 32 deletions

View File

@ -252,7 +252,7 @@ func main() {
if err := startMesh(s); err != nil {
log.Fatalf("startMesh: %v", err)
}
expvar.Publish("derp", s.ExpVar())
expvar.Publish("derp", s.ExpVar(*rateConfigPath != ""))
handleHome, ok := getHomeHandler(*flagHome)
if !ok {

View File

@ -353,7 +353,7 @@ func TestSendRecv(t *testing.T) {
}
}
serverMetrics := s.ExpVar().(*metrics.Set)
serverMetrics := s.ExpVar(false).(*metrics.Set)
wantActive := func(total, home int64) {
t.Helper()

View File

@ -172,6 +172,9 @@ type Server struct {
meshUpdateBatchSize *metrics.Histogram
meshUpdateLoopCount *metrics.Histogram
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
rateLimitGlobalWaited expvar.Int // number of times global rate limit caused a wait
rateLimitPerClientWaited expvar.Int // number of times per-client rate limit caused a wait
// TODO(illotum): add metrics for rate limited wait time, consider total seconds vs a histogram.
// verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a
@ -566,37 +569,11 @@ func (s *Server) LoadAndApplyRateConfig(path string) error {
return nil
}
var publishRateLimitsMetricsOnce sync.Once
func (s *Server) publishRateLimitsMetrics() {
// Rate limiting is currently experimental, its APIs are unstable, and it must
// be opted-in via --rate-config. Therefore, we only publish related metrics
// on demand, to avoid polluting uninterested metrics consumers.
//
// Note: The [sync.Once] is package-level, and the [expvar.Var] closures
// capture [Server], so first [Server] owns these for process lifetime.
publishRateLimitsMetricsOnce.Do(func() {
expvar.Publish("derp_per_client_rate_limit_bytes_per_second", s.expVarFunc(func() any {
return s.rateConfig.PerClientRateLimitBytesPerSec
}))
expvar.Publish("derp_per_client_rate_burst_bytes", s.expVarFunc(func() any {
return s.rateConfig.PerClientRateBurstBytes
}))
expvar.Publish("derp_global_rate_limit_bytes_per_second", s.expVarFunc(func() any {
return s.rateConfig.GlobalRateLimitBytesPerSec
}))
expvar.Publish("derp_global_rate_burst_bytes", s.expVarFunc(func() any {
return s.rateConfig.GlobalRateBurstBytes
}))
})
}
// UpdateRateLimits sets the receive rate limits, updating all existing client
// connections. It returns the applied config, which may differ from rc. If the
// per-client rate limit is 0, rate limiting is disabled. Mesh peers are always
// exempt from rate limiting.
func (s *Server) UpdateRateLimits(rc RateConfig) (applied RateConfig) {
s.publishRateLimitsMetrics()
s.mu.Lock()
defer s.mu.Unlock()
if rc.PerClientRateLimitBytesPerSec == 0 {
@ -1374,7 +1351,33 @@ func (c *sclient) setRateLimit(bytesPerSec, burst uint64, parent *xrate.Limiter)
c.recvLim.Store(lim)
}
// rateLimit applies the per-client receive rate limit.
// rateLimitWait is a reimplementation of [xrate.Limiter.WaitN] via [xrate.Limiter.ReserveN].
// It returns the duration waited for tokens to become available.
func rateLimitWait(ctx context.Context, lim *xrate.Limiter, n int, now time.Time, newTimer func(time.Duration) (<-chan time.Time, func() bool)) (time.Duration, error) {
r := lim.ReserveN(now, n)
if !r.OK() {
return 0, fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.Burst())
}
delay := r.DelayFrom(now)
if delay == 0 {
return 0, nil
}
ch, stop := newTimer(delay)
defer stop()
select {
case <-ch:
// Note: We return the predicted delay as wall-clock duration. May be not the same.
return delay, nil
case <-ctx.Done():
r.Cancel()
return 0, ctx.Err()
}
}
// rateLimit applies the receive rate limit.
// Per-client rate limiting is applied before global.
// The former lets us differentiate classes of service,
// the latter sets the overall pace of reading.
// By limiting here we prevent reading from the buffered reader
// [sclient.br] if the limit has been exceeded. Any reads done here provide space
// within the buffered reader to fill back in with data from
@ -1384,6 +1387,10 @@ func (c *sclient) setRateLimit(bytesPerSec, burst uint64, parent *xrate.Limiter)
// and this is a no-op.
func (c *sclient) rateLimit(n int) error {
if lim := c.recvLim.Load(); lim != nil {
newTimer := func(d time.Duration) (<-chan time.Time, func() bool) {
tc, ch := c.s.clock.NewTimer(d)
return ch, tc.Stop
}
// If n exceeds the capacity of the bucket, then WaitN will return
// an error and consume zero tokens. To prevent this, clamp n to
// [minRateLimitTokenBucketSize].
@ -1394,12 +1401,25 @@ func (c *sclient) rateLimit(n int) error {
// 2. is only partially read off the socket (bufio)
// 3. would cause the connection to close shortly after rate limiting, anyway.
clampedN := min(n, minRateLimitTokenBucketSize)
err := lim.child.WaitN(c.ctx, clampedN)
now := c.s.clock.Now()
durationWaited, err := rateLimitWait(c.ctx, lim.child, clampedN, now, newTimer)
if err != nil {
return err
}
if durationWaited > 0 {
c.s.rateLimitPerClientWaited.Add(1)
}
if lim.parent != nil {
return lim.parent.WaitN(c.ctx, clampedN)
if durationWaited > 0 {
now = c.s.clock.Now() // update 'now' if we already waited
}
durationWaited, err = rateLimitWait(c.ctx, lim.parent, clampedN, now, newTimer)
if err != nil {
return err
}
if durationWaited > 0 {
c.s.rateLimitGlobalWaited.Add(1)
}
}
}
return nil
@ -2393,7 +2413,7 @@ func (s *Server) expVarFunc(f func() any) expvar.Func {
}
// ExpVar returns an expvar variable suitable for registering with expvar.Publish.
func (s *Server) ExpVar() expvar.Var {
func (s *Server) ExpVar(rateLimitEnabled bool) expvar.Var {
m := new(metrics.Set)
m.Set("gauge_memstats_sys0", expvar.Func(func() any { return int64(s.memSys0) }))
m.Set("gauge_watchers", s.expVarFunc(func() any { return len(s.watchers) }))
@ -2436,6 +2456,25 @@ func (s *Server) ExpVar() expvar.Var {
var expvarVersion expvar.String
expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion)
if rateLimitEnabled {
// Rate limiting is currently experimental, its APIs are unstable, and it must
// be opted-in via --rate-config. Therefore, we only publish related metrics
// on demand, to avoid polluting uninterested metrics consumers.
m.Set("rate_limit_per_client_bytes_per_second", s.expVarFunc(func() any {
return s.rateConfig.PerClientRateLimitBytesPerSec
}))
m.Set("rate_limit_per_client_burst_bytes", s.expVarFunc(func() any {
return s.rateConfig.PerClientRateBurstBytes
}))
m.Set("rate_limit_global_bytes_per_second", s.expVarFunc(func() any {
return s.rateConfig.GlobalRateLimitBytesPerSec
}))
m.Set("rate_limit_global_burst_bytes", s.expVarFunc(func() any {
return s.rateConfig.GlobalRateBurstBytes
}))
m.Set("rate_limit_per_client_waited", &s.rateLimitPerClientWaited)
m.Set("rate_limit_global_waited", &s.rateLimitGlobalWaited)
}
return m
}

View File

@ -962,8 +962,12 @@ func TestPerClientRateLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
s := New(key.NewNode(), logger.Discard)
defer s.Close()
c := &sclient{
ctx: ctx,
s: s,
}
lim := &parentChildTokenBuckets{
// Set parent limit to half of child to enable verification of
@ -1029,6 +1033,15 @@ func TestPerClientRateLimit(t *testing.T) {
}
wantTokens(t, 0, minRateLimitTokenBucketSize)
// The second rateLimit call had to wait for both child and parent
// buckets, so both counters should be 1.
if got := s.rateLimitPerClientWaited.Value(); got != 1 {
t.Fatalf("rateLimitPerClientWaited = %d, want 1", got)
}
if got := s.rateLimitGlobalWaited.Value(); got != 1 {
t.Fatalf("rateLimitGlobalWaited = %d, want 1", got)
}
})
})
@ -1036,8 +1049,12 @@ func TestPerClientRateLimit(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
s := New(key.NewNode(), logger.Discard)
defer s.Close()
c := &sclient{
ctx: ctx,
s: s,
}
lim := &parentChildTokenBuckets{
child: rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize),
@ -1095,6 +1112,80 @@ func TestPerClientRateLimit(t *testing.T) {
})
}
// zeroTimer returns a timer that fires immediately.
func zeroTimer(_ time.Duration) (<-chan time.Time, func() bool) {
t := time.NewTimer(0)
return t.C, t.Stop
}
// neverTimer returns a timer that never fires.
func neverTimer(_ time.Duration) (<-chan time.Time, func() bool) {
return make(chan time.Time), func() bool { return false }
}
func TestRateLimitWait(t *testing.T) {
ctx := context.Background()
t.Run("no_wait", func(t *testing.T) {
lim := rate.NewLimiter(10, 10)
waited, err := rateLimitWait(ctx, lim, 5, time.Now(), zeroTimer)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if waited != 0 {
t.Fatalf("waited = %v, want 0", waited)
}
})
t.Run("wait_for_tokens", func(t *testing.T) {
lim := rate.NewLimiter(10, 10)
now := time.Now()
waited, err := rateLimitWait(ctx, lim, 10, now, zeroTimer) // exhaust all tokens
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if waited != 0 {
t.Fatalf("waited = %v, want 0", waited)
}
waited, err = rateLimitWait(ctx, lim, 10, now, zeroTimer)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if waited == 0 {
t.Fatal("waited = 0, want > 0")
}
})
t.Run("context_canceled", func(t *testing.T) {
lim := rate.NewLimiter(10, 10)
now := time.Now()
_, err := rateLimitWait(ctx, lim, 10, now, zeroTimer) // exhaust all tokens
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
canceled, cancel := context.WithCancel(ctx) // cancel context so the select picks ctx.Done()
cancel()
waited, err := rateLimitWait(canceled, lim, 10, now, neverTimer) // neverTimer to only unblock via context
if err == nil {
t.Fatal("expected error from canceled context")
}
if waited != 0 {
t.Fatalf("waited = %v, want 0", waited)
}
})
t.Run("n_exceeds_burst", func(t *testing.T) {
lim := rate.NewLimiter(10, 5)
waited, err := rateLimitWait(ctx, lim, 10, time.Now(), zeroTimer)
if err == nil {
t.Fatal("expected error when n > burst")
}
if waited != 0 {
t.Fatalf("waited = %v, want 0", waited)
}
})
}
func verifyLimiter(t *testing.T, lim *parentChildTokenBuckets, wantRateConfig RateConfig) {
t.Helper()
if got := lim.child.Limit(); got != rate.Limit(wantRateConfig.PerClientRateLimitBytesPerSec) {