diff --git a/derp/derpserver/derpserver.go b/derp/derpserver/derpserver.go index ef3699713..4e60fff67 100644 --- a/derp/derpserver/derpserver.go +++ b/derp/derpserver/derpserver.go @@ -172,7 +172,6 @@ type Server struct { meshUpdateBatchSize *metrics.Histogram meshUpdateLoopCount *metrics.Histogram bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush - rateLimitGlobalWaited expvar.Int // number of times global rate limit caused a wait rateLimitPerClientWaited expvar.Int // number of times per-client rate limit caused a wait // TODO(illotum): add metrics for rate limited wait time, consider total seconds vs a histogram. @@ -206,8 +205,7 @@ type Server struct { peerGoneWatchers map[key.NodePublic]set.HandleSet[func(key.NodePublic)] // maps from netip.AddrPort to a client's public key keyOfAddr map[netip.AddrPort]key.NodePublic - rateConfig RateConfig // server-global and per-client DERP frame rate limiting config - recvLim *xrate.Limiter // server-global DERP frame receive limiter + rateConfig RateConfig // per-client DERP frame rate limiting config } // clientSet represents 1 or more *sclients. @@ -519,25 +517,13 @@ const minRateLimitTokenBucketSize = derp.MaxPacketSize + derp.KeyLen // in bytes. type RateConfig struct { // PerClientRateLimitBytesPerSec represents the per-client - // rate limit in bytes per second. A zero value disables per-client rate limiting, - // but global (GlobalRate...) configuration may still apply. + // rate limit in bytes per second. A zero value disables all rate limiting. PerClientRateLimitBytesPerSec uint64 `json:",omitzero"` // PerClientRateBurstBytes represents the per-client token bucket depth, // or burst, in bytes. Any value lower than [minRateLimitTokenBucketSize] // will be increased to [minRateLimitTokenBucketSize] before application. Only // relevant if PerClientRateLimitBytesPerSec is nonzero. PerClientRateBurstBytes uint64 `json:",omitzero"` - // GlobalRateLimitBytesPerSec represents the global rate limit in bytes per - // second. A zero value disables global rate limiting, but per-client (PerClient...) - // configuration may still apply. If GlobalRateLimitBytesPerSec is nonzero and less than - // PerClientRateLimitBytesPerSec, then GlobalRateLimitBytesPerSec will be set - // equal to PerClientRateLimitBytesPerSec before application. - GlobalRateLimitBytesPerSec uint64 `json:",omitzero"` - // GlobalRateBurstBytes represents the global token bucket depth, or burst, - // in bytes. Any value lower than [minRateLimitTokenBucketSize] will be increased to - // [minRateLimitTokenBucketSize] before application. Only relevant if - // GlobalRateLimitBytesPerSec is nonzero. - GlobalRateBurstBytes uint64 `json:",omitzero"` } // LoadRateConfig reads and JSON-unmarshals a [RateConfig] from the file at path. @@ -564,36 +550,28 @@ func (s *Server) LoadAndApplyRateConfig(path string) error { return err } applied := s.UpdateRateLimits(rc) - s.logf("rate config applied: global-rate=%d bytes/sec global-burst=%d bytes client-rate=%d bytes/sec, client-burst=%d bytes", - applied.GlobalRateLimitBytesPerSec, applied.GlobalRateBurstBytes, applied.PerClientRateLimitBytesPerSec, applied.PerClientRateBurstBytes) + s.logf("rate config applied: client-rate=%d bytes/sec, client-burst=%d bytes", + applied.PerClientRateLimitBytesPerSec, applied.PerClientRateBurstBytes) return nil } // UpdateRateLimits sets the receive rate limits, updating all existing client -// connections. It returns the applied config, which may differ from rc. If both -// the per-client and global rate limits are 0, rate limiting is disabled. Mesh -// peers are always exempt from rate limiting. +// connections. It returns the applied config, which may differ from rc. If the +// per-client rate limits is 0, rate limiting is disabled. Mesh peers are always +// exempt from rate limiting. func (s *Server) UpdateRateLimits(rc RateConfig) (applied RateConfig) { s.mu.Lock() defer s.mu.Unlock() - if rc.PerClientRateLimitBytesPerSec == 0 && rc.GlobalRateLimitBytesPerSec == 0 { - // if per-client and global are disabled, all rate limiting is disabled + if rc.PerClientRateLimitBytesPerSec == 0 { + // all rate limiting is disabled rc = RateConfig{} - } - if rc.PerClientRateLimitBytesPerSec != 0 { - rc.PerClientRateBurstBytes = max(rc.PerClientRateBurstBytes, minRateLimitTokenBucketSize) - } - if rc.GlobalRateLimitBytesPerSec != 0 { - rc.GlobalRateLimitBytesPerSec = max(rc.GlobalRateLimitBytesPerSec, rc.PerClientRateLimitBytesPerSec) - rc.GlobalRateBurstBytes = max(rc.GlobalRateBurstBytes, minRateLimitTokenBucketSize) - s.recvLim = xrate.NewLimiter(xrate.Limit(rc.GlobalRateLimitBytesPerSec), int(rc.GlobalRateBurstBytes)) } else { - s.recvLim = nil + rc.PerClientRateBurstBytes = max(rc.PerClientRateBurstBytes, minRateLimitTokenBucketSize) } s.rateConfig = rc for _, cs := range s.clients { cs.ForeachClient(func(c *sclient) { - c.setRateLimit(rc.PerClientRateLimitBytesPerSec, rc.PerClientRateBurstBytes, s.recvLim) + c.setRateLimit(rc.PerClientRateLimitBytesPerSec, rc.PerClientRateBurstBytes) }) } return rc @@ -761,7 +739,7 @@ func (s *Server) registerClient(c *sclient) { s.mu.Lock() defer s.mu.Unlock() - c.setRateLimit(s.rateConfig.PerClientRateLimitBytesPerSec, s.rateConfig.PerClientRateBurstBytes, s.recvLim) + c.setRateLimit(s.rateConfig.PerClientRateLimitBytesPerSec, s.rateConfig.PerClientRateBurstBytes) cs, ok := s.clients[c.key] if !ok { @@ -1336,22 +1314,15 @@ func (c *sclient) handleFrameSendPacket(_ derp.FrameType, fl uint32) error { return c.sendPkt(dst, p) } -// setRateLimit updates the receive rate limiter. When bytesPerSec is 0 and parent is nil, or the +// setRateLimit updates the receive rate limiter. When bytesPerSec is 0, or the // client is a mesh peer, the limiter is set to nil so that [sclient.rateLimit] is a no-op. -func (c *sclient) setRateLimit(bytesPerSec, burst uint64, parent *xrate.Limiter) { - if c.canMesh || (bytesPerSec == 0 && parent == nil) { +func (c *sclient) setRateLimit(bytesPerSec, burst uint64) { + if c.canMesh || bytesPerSec == 0 { c.recvLim.Store(nil) return } - var child *xrate.Limiter - if bytesPerSec != 0 { - child = xrate.NewLimiter(xrate.Limit(bytesPerSec), int(burst)) - } - lim := &parentChildTokenBuckets{ - parent: parent, - child: child, - } - c.recvLim.Store(lim) + limiter := xrate.NewLimiter(xrate.Limit(bytesPerSec), int(burst)) + c.recvLim.Store(limiter) } // rateLimitWait is a reimplementation of [xrate.Limiter.WaitN] via [xrate.Limiter.ReserveN]. @@ -1378,9 +1349,6 @@ func rateLimitWait(ctx context.Context, lim *xrate.Limiter, n int, now time.Time } // rateLimit applies the receive rate limit. -// Per-client rate limiting is applied before global. -// The former lets us differentiate classes of service, -// the latter sets the overall pace of reading. // By limiting here we prevent reading from the buffered reader // [sclient.br] if the limit has been exceeded. Any reads done here provide space // within the buffered reader to fill back in with data from @@ -1409,26 +1377,12 @@ func (c *sclient) rateLimit(n int) error { durationWaited time.Duration err error ) - if lim.child != nil { - durationWaited, err = rateLimitWait(c.ctx, lim.child, clampedN, now, newTimer) - if err != nil { - return err - } - if durationWaited > 0 { - c.s.rateLimitPerClientWaited.Add(1) - } + durationWaited, err = rateLimitWait(c.ctx, lim, clampedN, now, newTimer) + if err != nil { + return err } - if lim.parent != nil { - if durationWaited > 0 { - now = c.s.clock.Now() // update 'now' if we already waited - } - durationWaited, err = rateLimitWait(c.ctx, lim.parent, clampedN, now, newTimer) - if err != nil { - return err - } - if durationWaited > 0 { - c.s.rateLimitGlobalWaited.Add(1) - } + if durationWaited > 0 { + c.s.rateLimitPerClientWaited.Add(1) } } return nil @@ -1867,21 +1821,14 @@ type sclient struct { peerGoneLim *rate.Limiter // recvLim is the receive rate limiter. When rate limiting is enabled for a - // non-mesh client, it points to a [parentChildTokenBuckets]. When rate limiting + // non-mesh client, it points to a [xrate.Limiter]. When rate limiting // is disabled or the client is a mesh peer, it is nil and [sclient.rateLimit] // is a no-op. Updated atomically by [sclient.setRateLimit] so that // [sclient.rateLimit] can load it without holding [Server.mu]. - recvLim atomic.Pointer[parentChildTokenBuckets] -} - -// parentChildTokenBuckets contains a parent and child token bucket for the -// purpose of applying in a hierarchical topology. -// -// TODO: consider porting the required APIs from [xrate.Limiter] to [rate.Limiter], -// which is already optimized to use [mono.Time]. -type parentChildTokenBuckets struct { - parent *xrate.Limiter // parent may be nil - child *xrate.Limiter // child may be nil + // + // TODO: consider porting the required APIs from [xrate.Limiter] to [rate.Limiter], + // which is already optimized to use [mono.Time]. + recvLim atomic.Pointer[xrate.Limiter] } func (c *sclient) presentFlags() derp.PeerPresentFlags { @@ -2475,14 +2422,7 @@ func (s *Server) ExpVar(rateLimitEnabled bool) expvar.Var { m.Set("rate_limit_per_client_burst_bytes", s.expVarFunc(func() any { return s.rateConfig.PerClientRateBurstBytes })) - m.Set("rate_limit_global_bytes_per_second", s.expVarFunc(func() any { - return s.rateConfig.GlobalRateLimitBytesPerSec - })) - m.Set("rate_limit_global_burst_bytes", s.expVarFunc(func() any { - return s.rateConfig.GlobalRateBurstBytes - })) m.Set("rate_limit_per_client_waited", &s.rateLimitPerClientWaited) - m.Set("rate_limit_global_waited", &s.rateLimitGlobalWaited) } return m } diff --git a/derp/derpserver/derpserver_test.go b/derp/derpserver/derpserver_test.go index a5c0e025d..7143a9b3d 100644 --- a/derp/derpserver/derpserver_test.go +++ b/derp/derpserver/derpserver_test.go @@ -969,27 +969,19 @@ func TestPerClientRateLimit(t *testing.T) { ctx: ctx, s: s, } - lim := &parentChildTokenBuckets{ - // Set parent limit to half of child to enable verification of - // rate limiting across both layers with a single sclient. - parent: rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize)/2, minRateLimitTokenBucketSize), - child: rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize), - } + lim := rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize) c.recvLim.Store(lim) - wantTokens := func(t *testing.T, wantParentTokens, wantChildTokens float64) { + wantTokens := func(t *testing.T, wantTokens float64) { t.Helper() - if lim.parent.Tokens() != wantParentTokens { - t.Fatalf("want parent tokens: %v got: %v", wantParentTokens, lim.parent.Tokens()) - } - if lim.child.Tokens() != wantChildTokens { - t.Fatalf("want child tokens: %v got: %v", wantChildTokens, lim.child.Tokens()) + if lim.Tokens() != wantTokens { + t.Fatalf("want tokens: %v got: %v", wantTokens, lim.Tokens()) } } // First call within burst should not block. c.rateLimit(minRateLimitTokenBucketSize) - wantTokens(t, 0, 0) + wantTokens(t, 0) // Next call exceeds burst, should block until tokens replenish. done := make(chan error, 1) @@ -1005,21 +997,7 @@ func TestPerClientRateLimit(t *testing.T) { default: } - // Advance time by 1 second, the goroutine should still be blocked - // on the parent bucket (negative tokens). - time.Sleep(1 * time.Second) - synctest.Wait() - select { - case err := <-done: - t.Fatalf("rateLimit should have blocked, but returned: %v", err) - default: - } - - // Verify the parent bucket fills at half the rate of the child. - wantTokens(t, -(minRateLimitTokenBucketSize / 2), 0) - - // Advance time by another second, parent should have enough tokens - // to unblock. + // Advance time by 1 second, the goroutine should be unblocked time.Sleep(1 * time.Second) synctest.Wait() @@ -1032,16 +1010,12 @@ func TestPerClientRateLimit(t *testing.T) { t.Fatal("rateLimit should have unblocked after 1s") } - wantTokens(t, 0, minRateLimitTokenBucketSize) + wantTokens(t, 0) - // The second rateLimit call had to wait for both child and parent - // buckets, so both counters should be 1. + // The second rateLimit call had to wait if got := s.rateLimitPerClientWaited.Value(); got != 1 { t.Fatalf("rateLimitPerClientWaited = %d, want 1", got) } - if got := s.rateLimitGlobalWaited.Value(); got != 1 { - t.Fatalf("rateLimitGlobalWaited = %d, want 1", got) - } }) }) @@ -1056,10 +1030,7 @@ func TestPerClientRateLimit(t *testing.T) { ctx: ctx, s: s, } - lim := &parentChildTokenBuckets{ - child: rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize), - parent: rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize), - } + lim := rate.NewLimiter(rate.Limit(minRateLimitTokenBucketSize), minRateLimitTokenBucketSize) c.recvLim.Store(lim) // Exhaust burst. @@ -1186,20 +1157,14 @@ func TestRateLimitWait(t *testing.T) { }) } -func verifyLimiter(t *testing.T, lim *parentChildTokenBuckets, wantRateConfig RateConfig) { +func verifyLimiter(t *testing.T, lim *rate.Limiter, wantRateConfig RateConfig) { t.Helper() - if got := lim.child.Limit(); got != rate.Limit(wantRateConfig.PerClientRateLimitBytesPerSec) { + if got := lim.Limit(); got != rate.Limit(wantRateConfig.PerClientRateLimitBytesPerSec) { t.Errorf("client rate limit = %v; want %d", got, wantRateConfig.PerClientRateLimitBytesPerSec) } - if got := lim.child.Burst(); got != int(wantRateConfig.PerClientRateBurstBytes) { + if got := lim.Burst(); got != int(wantRateConfig.PerClientRateBurstBytes) { t.Errorf("client burst = %v; want %d", got, wantRateConfig.PerClientRateBurstBytes) } - if got := lim.parent.Limit(); got != rate.Limit(wantRateConfig.GlobalRateLimitBytesPerSec) { - t.Errorf("global rate limit = %v, want %d", got, wantRateConfig.GlobalRateLimitBytesPerSec) - } - if got := lim.parent.Burst(); got != int(wantRateConfig.GlobalRateBurstBytes) { - t.Errorf("global burst = %v, want %d", got, wantRateConfig.GlobalRateBurstBytes) - } } func TestUpdateRateLimits(t *testing.T) { @@ -1208,10 +1173,6 @@ func TestUpdateRateLimits(t *testing.T) { testClientRate1 = minRateLimitTokenBucketSize + 2 testClientBurst2 = minRateLimitTokenBucketSize + 3 testClientRate2 = minRateLimitTokenBucketSize + 4 - testGlobalBurst1 = minRateLimitTokenBucketSize + 5 - testGlobalRate1 = minRateLimitTokenBucketSize + 6 - testGlobalBurst2 = minRateLimitTokenBucketSize + 7 - testGlobalRate2 = minRateLimitTokenBucketSize + 8 ) s := New(key.NewNode(), t.Logf) @@ -1235,8 +1196,6 @@ func TestUpdateRateLimits(t *testing.T) { rc := RateConfig{ PerClientRateLimitBytesPerSec: testClientRate1, PerClientRateBurstBytes: testClientBurst1, - GlobalRateLimitBytesPerSec: testGlobalRate1, - GlobalRateBurstBytes: testGlobalBurst1, } s.UpdateRateLimits(rc) @@ -1257,8 +1216,6 @@ func TestUpdateRateLimits(t *testing.T) { rc = RateConfig{ PerClientRateLimitBytesPerSec: testClientRate2, PerClientRateBurstBytes: testClientBurst2, - GlobalRateLimitBytesPerSec: testGlobalRate2, - GlobalRateBurstBytes: testGlobalBurst2, } s.UpdateRateLimits(rc) lim = c.recvLim.Load() @@ -1271,7 +1228,7 @@ func TestUpdateRateLimits(t *testing.T) { s.UpdateRateLimits(RateConfig{}) if got := c.recvLim.Load(); got != nil { - t.Errorf("expected nil limiter after disable, got limit=%v", got.child.Limit()) + t.Errorf("expected nil limiter after disable, got limit=%v", got.Limit()) } // Mesh peer should always have nil limiter regardless of update. @@ -1292,13 +1249,11 @@ func TestUpdateRateLimits(t *testing.T) { rc = RateConfig{ PerClientRateLimitBytesPerSec: testClientRate2, PerClientRateBurstBytes: testClientBurst2, - GlobalRateLimitBytesPerSec: testGlobalRate2, - GlobalRateBurstBytes: testGlobalBurst2, } s.UpdateRateLimits(rc) if got := meshClient.recvLim.Load(); got != nil { - t.Errorf("mesh peer should have nil limiter, got limit=%v", got.child.Limit()) + t.Errorf("mesh peer should have nil limiter, got limit=%v", got.Limit()) } // Non-mesh client should be updated. lim = c.recvLim.Load() @@ -1319,8 +1274,6 @@ func TestUpdateRateLimits(t *testing.T) { s.mu.Unlock() rc = RateConfig{ - GlobalRateLimitBytesPerSec: testGlobalRate1, - GlobalRateBurstBytes: testGlobalBurst1, PerClientRateLimitBytesPerSec: testClientRate1, PerClientRateBurstBytes: testClientBurst1, } @@ -1340,17 +1293,14 @@ func TestLoadRateConfig(t *testing.T) { json string wantRateConfig RateConfig }{ - {"all_set", `{"PerClientRateLimitBytesPerSec": 1, "PerClientRateBurstBytes": 2, "GlobalRateLimitBytesPerSec": 3, "GlobalRateBurstBytes": 4}`, RateConfig{ + {"all_set", `{"PerClientRateLimitBytesPerSec": 1, "PerClientRateBurstBytes": 2}`, RateConfig{ PerClientRateLimitBytesPerSec: 1, PerClientRateBurstBytes: 2, - GlobalRateLimitBytesPerSec: 3, - GlobalRateBurstBytes: 4, }}, - {"rate_only", `{"PerClientRateLimitBytesPerSec": 1, "GlobalRateLimitBytesPerSec": 3}`, RateConfig{ + {"rate_only", `{"PerClientRateLimitBytesPerSec": 1}`, RateConfig{ PerClientRateLimitBytesPerSec: 1, - GlobalRateLimitBytesPerSec: 3, }}, - {"zeros", `{"PerClientRateLimitBytesPerSec": 0, "PerClientRateBurstBytes": 0, "GlobalRateLimitBytesPerSec": 0, "GlobalRateBurstBytes": 0}`, RateConfig{}}, + {"zeros", `{"PerClientRateLimitBytesPerSec": 0, "PerClientRateBurstBytes": 0}`, RateConfig{}}, {"empty_json", `{}`, RateConfig{}}, } { t.Run(tt.name, func(t *testing.T) { @@ -1415,8 +1365,8 @@ func TestLoadAndApplyRateConfig(t *testing.T) { s.clients[clientKey] = cs s.mu.Unlock() - f := writeConfig(t, fmt.Sprintf(`{"PerClientRateLimitBytesPerSec": %d, "PerClientRateBurstBytes": %d, "GlobalRateLimitBytesPerSec": %d, "GlobalRateBurstBytes": %d}`, - minRateLimitTokenBucketSize, minRateLimitTokenBucketSize+1, minRateLimitTokenBucketSize+2, minRateLimitTokenBucketSize+3)) + f := writeConfig(t, fmt.Sprintf(`{"PerClientRateLimitBytesPerSec": %d, "PerClientRateBurstBytes": %d}`, + minRateLimitTokenBucketSize, minRateLimitTokenBucketSize+1)) if err := s.LoadAndApplyRateConfig(f); err != nil { t.Fatalf("LoadAndApplyRateConfig: %v", err) } @@ -1425,8 +1375,6 @@ func TestLoadAndApplyRateConfig(t *testing.T) { wantRateConfig := RateConfig{ PerClientRateLimitBytesPerSec: minRateLimitTokenBucketSize, PerClientRateBurstBytes: minRateLimitTokenBucketSize + 1, - GlobalRateLimitBytesPerSec: minRateLimitTokenBucketSize + 2, - GlobalRateBurstBytes: minRateLimitTokenBucketSize + 3, } s.mu.Lock() if !reflect.DeepEqual(s.rateConfig, wantRateConfig) { @@ -1446,28 +1394,24 @@ func TestLoadAndApplyRateConfig(t *testing.T) { s := New(key.NewNode(), t.Logf) defer s.Close() - f := writeConfig(t, `{"PerClientRateLimitBytesPerSec": 1250000, "PerClientRateBurstBytes": 10, "GlobalRateLimitBytesPerSec": 1250000, "GlobalRateBurstBytes": 10}`) + f := writeConfig(t, `{"PerClientRateLimitBytesPerSec": 1250000, "PerClientRateBurstBytes": 10}`) if err := s.LoadAndApplyRateConfig(f); err != nil { t.Fatalf("LoadAndApplyRateConfig: %v", err) } s.mu.Lock() gotClientBurst := s.rateConfig.PerClientRateBurstBytes - gotGlobalBurst := s.rateConfig.GlobalRateBurstBytes s.mu.Unlock() if gotClientBurst != minRateLimitTokenBucketSize { t.Errorf("client burst = %d; want %d", gotClientBurst, minRateLimitTokenBucketSize) } - if gotGlobalBurst != minRateLimitTokenBucketSize { - t.Errorf("global burst = %d; want %d", gotGlobalBurst, minRateLimitTokenBucketSize) - } }) t.Run("reload_disables_limiting", func(t *testing.T) { s := New(key.NewNode(), t.Logf) defer s.Close() - f := writeConfig(t, `{"PerClientRateLimitBytesPerSec": 1250000, "PerClientRateBurstBytes": 2500000, "GlobalRateLimitBytesPerSec": 12500000, "GlobalRateBurstBytes": 25000000}`) + f := writeConfig(t, `{"PerClientRateLimitBytesPerSec": 1250000, "PerClientRateBurstBytes": 2500000}`) if err := s.LoadAndApplyRateConfig(f); err != nil { t.Fatal(err) }