Merge PR #9581: Rate Limit Quota Headers

This commit is contained in:
Alexander Bezobchuk 2020-07-29 15:15:05 -04:00 committed by GitHub
parent b20889a489
commit 7b06590909
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1047 additions and 235 deletions

2
go.mod
View File

@ -131,6 +131,7 @@ require (
github.com/ryanuber/go-glob v1.0.0
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/sasha-s/go-deadlock v0.2.0
github.com/sethvargo/go-limiter v0.2.3
github.com/shirou/gopsutil v2.20.6-0.20200630091542-01afd763e6c0+incompatible
github.com/stretchr/testify v1.5.1
github.com/tidwall/pretty v1.0.1 // indirect
@ -146,7 +147,6 @@ require (
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
golang.org/x/tools v0.0.0-20200521155704-91d71f6c2f04
google.golang.org/api v0.29.0
google.golang.org/grpc v1.29.1

2
go.sum
View File

@ -834,6 +834,8 @@ github.com/sean-/conswriter v0.0.0-20180208195008-f5ae3917a627/go.mod h1:7zjs06q
github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6qbIiqJ6/Bqeq25bCLbL7YFmpaFfJDuM=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sethvargo/go-limiter v0.2.3 h1:xgphnEi/Ga/1JMU3Y4Zc8PzyOes4fci/wWwHN6alcqg=
github.com/sethvargo/go-limiter v0.2.3/go.mod h1:C0kbSFbiriE5k2FFOe18M1YZbAR2Fiwf72uGu0CXCcU=
github.com/shirou/gopsutil v2.20.6-0.20200630091542-01afd763e6c0+incompatible h1:IYOqH6sML3rQGNVEQ5foLtpDt4TeW8PIUBuI9f8itkI=
github.com/shirou/gopsutil v2.20.6-0.20200630091542-01afd763e6c0+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=

View File

@ -60,6 +60,12 @@ func rateLimitQuotaWrapping(handler http.Handler, core *vault.Core) http.Handler
return
}
if core.RateLimitResponseHeadersEnabled() {
for h, v := range quotaResp.Headers {
w.Header().Set(h, v)
}
}
if !quotaResp.Allowed {
quotaErr := errwrap.Wrapf(fmt.Sprintf("request path %q: {{err}}", path), quotas.ErrRateLimitQuotaExceeded)
respondError(w, http.StatusTooManyRequests, quotaErr)

View File

@ -2533,3 +2533,13 @@ func (c *Core) RateLimitAuditLoggingEnabled() bool {
return false
}
// RateLimitResponseHeadersEnabled returns if the quota configuration allows for
// rate limit quota HTTP headers to be added to responses.
func (c *Core) RateLimitResponseHeadersEnabled() bool {
if c.quotaManager != nil {
return c.quotaManager.RateLimitResponseHeadersEnabled()
}
return false
}

View File

@ -3,6 +3,7 @@ package vault
import (
"context"
"strings"
"time"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/framework"
@ -20,6 +21,10 @@ func (b *SystemBackend) quotasPaths() []*framework.Path {
Type: framework.TypeBool,
Description: "If set, starts audit logging of requests that get rejected due to rate limit quota rule violations.",
},
"enable_rate_limit_response_headers": {
Type: framework.TypeBool,
Description: "If set, additional rate limit quota HTTP headers will be added to responses.",
},
},
Operations: map[logical.Operation]framework.OperationHandler{
logical.UpdateOperation: &framework.PathOperation{
@ -61,9 +66,13 @@ namespace1/auth/userpass adds a quota to userpass in namespace1.`,
},
"rate": {
Type: framework.TypeFloat,
Description: `The maximum number of requests at any given second to be allowed by the quota rule.
Description: `The maximum number of requests in a given interval to be allowed by the quota rule.
The 'rate' must be positive.`,
},
"interval": {
Type: framework.TypeDurationSecond,
Description: "The duration to enforce rate limiting for (default '1s').",
},
},
Operations: map[logical.Operation]framework.OperationHandler{
logical.UpdateOperation: &framework.PathOperation{
@ -88,7 +97,9 @@ func (b *SystemBackend) handleQuotasConfigUpdate() framework.OperationFunc {
if err != nil {
return nil, err
}
config.EnableRateLimitAuditLogging = d.Get("enable_rate_limit_audit_logging").(bool)
config.EnableRateLimitResponseHeaders = d.Get("enable_rate_limit_response_headers").(bool)
entry, err := logical.StorageEntryJSON(quotas.ConfigPath, config)
if err != nil {
@ -99,6 +110,8 @@ func (b *SystemBackend) handleQuotasConfigUpdate() framework.OperationFunc {
}
b.Core.quotaManager.SetEnableRateLimitAuditLogging(config.EnableRateLimitAuditLogging)
b.Core.quotaManager.SetEnableRateLimitResponseHeaders(config.EnableRateLimitResponseHeaders)
return nil, nil
}
}
@ -108,7 +121,8 @@ func (b *SystemBackend) handleQuotasConfigRead() framework.OperationFunc {
config := b.Core.quotaManager.Config()
return &logical.Response{
Data: map[string]interface{}{
"enable_rate_limit_audit_logging": config.EnableRateLimitAuditLogging,
"enable_rate_limit_audit_logging": config.EnableRateLimitAuditLogging,
"enable_rate_limit_response_headers": config.EnableRateLimitResponseHeaders,
},
}, nil
}
@ -135,6 +149,11 @@ func (b *SystemBackend) handleRateLimitQuotasUpdate() framework.OperationFunc {
return logical.ErrorResponse("'rate' is invalid"), nil
}
interval := time.Second * time.Duration(d.Get("interval").(int))
if interval == 0 {
interval = time.Second
}
mountPath := sanitizePath(d.Get("path").(string))
ns := b.Core.namespaceByPath(mountPath)
if ns.ID != namespace.RootNamespaceID {
@ -166,12 +185,13 @@ func (b *SystemBackend) handleRateLimitQuotasUpdate() framework.OperationFunc {
return logical.ErrorResponse("quota rule with similar properties exists under the name %q", quotaByFactors.QuotaName()), nil
}
quota = quotas.NewRateLimitQuota(name, ns.Path, mountPath, rate)
quota = quotas.NewRateLimitQuota(name, ns.Path, mountPath, rate, interval)
default:
rlq := quota.(*quotas.RateLimitQuota)
rlq.NamespacePath = ns.Path
rlq.MountPath = mountPath
rlq.Rate = rate
rlq.Interval = interval
}
entry, err := logical.StorageEntryJSON(quotas.QuotaStoragePath(qType, name), quota)
@ -212,10 +232,11 @@ func (b *SystemBackend) handleRateLimitQuotasRead() framework.OperationFunc {
}
data := map[string]interface{}{
"type": qType,
"name": rlq.Name,
"path": nsPath + rlq.MountPath,
"rate": rlq.Rate,
"type": qType,
"name": rlq.Name,
"path": nsPath + rlq.MountPath,
"rate": rlq.Rate,
"interval": int(rlq.Interval.Seconds()),
}
return &logical.Response{
@ -249,7 +270,7 @@ var quotasHelp = map[string][2]string{
"rate-limit": {
`Get, create or update rate limit resource quota for an optional namespace or
mount.`,
`A rate limit quota will enforce API rate limiting on a per-second basis. A
`A rate limit quota will enforce API rate limiting in a specified interval. A
rate limit quota can be created at the root level or defined on a namespace or
mount by specifying a 'path'. The rate limiter is applied to each unique client
IP address.`,

View File

@ -177,6 +177,10 @@ type Response struct {
// Access is the handle to reach back into the quota rule that processed the
// quota request. This may not be set all the time.
Access Access
// Headers defines any optional headers that may be returned by the quota rule
// to clients.
Headers map[string]string
}
// Config holds operator preferences around quota behaviors
@ -184,6 +188,10 @@ type Config struct {
// EnableRateLimitAuditLogging, if set, starts audit logging of the
// request rejections that arise due to rate limit quota violations.
EnableRateLimitAuditLogging bool `json:"enable_rate_limit_audit_logging"`
// EnableRateLimitResponseHeaders dictates if rate limit quota HTTP headers
// should be added to responses.
EnableRateLimitResponseHeaders bool `json:"enable_rate_limit_response_headers"`
}
// Request contains information required by the quota manager to query and
@ -529,12 +537,24 @@ func (m *Manager) SetEnableRateLimitAuditLogging(val bool) {
m.config.EnableRateLimitAuditLogging = val
}
// SetEnableRateLimitResponseHeaders updates the operator preference regarding
// the rate limit quota HTTP header behavior.
func (m *Manager) SetEnableRateLimitResponseHeaders(val bool) {
m.config.EnableRateLimitResponseHeaders = val
}
// RateLimitAuditLoggingEnabled returns if the quota configuration allows audit
// logging of request rejections due to rate limiting quota rule violations.
func (m *Manager) RateLimitAuditLoggingEnabled() bool {
return m.config.EnableRateLimitAuditLogging
}
// RateLimitResponseHeadersEnabled returns if the quota configuration allows for
// rate limit quota HTTP headers to be added to responses.
func (m *Manager) RateLimitResponseHeadersEnabled() bool {
return m.config.EnableRateLimitResponseHeaders
}
// Config returns the operator preferences in the quota manager
func (m *Manager) Config() *Config {
return m.config
@ -554,8 +574,7 @@ func (m *Manager) Reset() error {
m.storage = nil
m.ctx = nil
m.entManager.Reset()
return nil
return m.entManager.Reset()
}
// dbSchema creates a DB schema for holding all the quota rules. It creates a
@ -636,7 +655,10 @@ func (m *Manager) Invalidate(key string) {
m.logger.Error("failed to invalidate quota config", "error", err)
return
}
m.SetEnableRateLimitAuditLogging(config.EnableRateLimitAuditLogging)
m.SetEnableRateLimitResponseHeaders(config.EnableRateLimitResponseHeaders)
default:
splitKeys := strings.Split(key, "/")
if len(splitKeys) != 2 {

View File

@ -3,6 +3,7 @@ package quotas
import (
"fmt"
"math"
"strconv"
"sync"
"time"
@ -11,7 +12,9 @@ import (
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/sdk/helper/pathmanager"
"golang.org/x/time/rate"
"github.com/sethvargo/go-limiter"
"github.com/sethvargo/go-limiter/httplimit"
"github.com/sethvargo/go-limiter/memorystore"
)
var rateLimitExemptPaths = pathmanager.New()
@ -42,38 +45,11 @@ func init() {
})
}
// ClientRateLimiter defines a token bucket based rate limiter for a unique
// addressable client (e.g. IP address). Whenever this client attempts to make
// a request, the lastSeen value will be updated.
type ClientRateLimiter struct {
// lastSeen defines the UNIX timestamp the client last made a request.
lastSeen time.Time
// limiter represents an instance of a token bucket based rate limiter.
limiter *rate.Limiter
}
// newClientRateLimiter returns a token bucket based rate limiter for a client
// that is uniquely addressable, where maxRequests defines the requests-per-second
// and burstSize defines the maximum burst allowed. A caller may provide -1 for
// burstSize to allow the burst value to be roughly equivalent to the RPS. Note,
// the underlying rate limiter is thread-safe.
func newClientRateLimiter(maxRequests float64, burstSize int) *ClientRateLimiter {
if burstSize < 0 {
burstSize = int(math.Ceil(maxRequests))
}
return &ClientRateLimiter{
lastSeen: time.Now().UTC(),
limiter: rate.NewLimiter(rate.Limit(maxRequests), burstSize),
}
}
// Ensure that RateLimitQuota implements the Quota interface
var _ Quota = (*RateLimitQuota)(nil)
// RateLimitQuota represents the quota rule properties that is used to limit the
// number of requests per second for a namespace or mount.
// number of requests in a given interval for a namespace or mount.
type RateLimitQuota struct {
// ID is the identifier of the quota
ID string `json:"id"`
@ -91,40 +67,33 @@ type RateLimitQuota struct {
// MountPath is the path of the mount to which this quota is applicable
MountPath string `json:"mount_path"`
// Rate defines the rate of which allowed requests are refilled per second.
// Rate defines the number of requests allowed per Interval.
Rate float64 `json:"rate"`
lock *sync.RWMutex
logger log.Logger
metricSink *metricsutil.ClusterMetricSink
purgeEnabled bool
// Interval defines the duration to which rate limiting is applied.
Interval time.Duration `json:"interval"`
// purgeInterval defines the interval in seconds in which the RateLimitQuota
// attempts to remove stale entries from the rateQuotas mapping.
lock *sync.RWMutex
store limiter.Store
logger log.Logger
metricSink *metricsutil.ClusterMetricSink
purgeInterval time.Duration
closeCh chan struct{}
// staleAge defines the age in seconds in which a clientRateLimiter is
// considered stale. A clientRateLimiter is considered stale if the delta
// between the current purge time and its lastSeen timestamp is greater than
// this value.
staleAge time.Duration
// rateQuotas contains a mapping from a unique addressable client (e.g. IP address)
// to a clientRateLimiter reference. Every purgeInterval seconds, the RateLimitQuota
// will attempt to remove stale entries from the mapping.
rateQuotas map[string]*ClientRateLimiter
staleAge time.Duration
}
// NewRateLimitQuota creates a quota checker for imposing limits on the number
// of requests per second.
func NewRateLimitQuota(name, nsPath, mountPath string, rate float64) *RateLimitQuota {
// of requests in a given interval. An interval time duration of zero may be
// provided, which will default to 1s when initialized.
func NewRateLimitQuota(name, nsPath, mountPath string, rate float64, interval time.Duration) *RateLimitQuota {
return &RateLimitQuota{
Name: name,
Type: TypeRateLimit,
NamespacePath: nsPath,
MountPath: mountPath,
Rate: rate,
Interval: interval,
purgeInterval: DefaultRateLimitPurgeInterval,
staleAge: DefaultRateLimitStaleAge,
}
}
@ -145,6 +114,10 @@ func (rlq *RateLimitQuota) initialize(logger log.Logger, ms *metricsutil.Cluster
rlq.NamespacePath = "root"
}
if rlq.Interval == 0 {
rlq.Interval = time.Second
}
if rlq.Rate <= 0 {
return fmt.Errorf("invalid avg rps: %v", rlq.Rate)
}
@ -166,38 +139,21 @@ func (rlq *RateLimitQuota) initialize(logger log.Logger, ms *metricsutil.Cluster
rlq.ID = id
}
rlq.purgeInterval = DefaultRateLimitPurgeInterval
rlq.staleAge = DefaultRateLimitStaleAge
rlq.rateQuotas = make(map[string]*ClientRateLimiter)
if !rlq.purgeEnabled {
rlq.purgeEnabled = true
rlq.closeCh = make(chan struct{})
go rlq.purgeClientsLoop()
rlStore, err := memorystore.New(&memorystore.Config{
Tokens: uint64(math.Round(rlq.Rate)), // allow 'rlq.Rate' number of requests per 'Interval'
Interval: rlq.Interval, // time interval in which to enforce rate limiting
SweepInterval: rlq.purgeInterval, // how often stale clients are removed
SweepMinTTL: rlq.staleAge, // how long since the last request a client is considered stale
})
if err != nil {
return err
}
rlq.store = rlStore
return nil
}
func (rlq *RateLimitQuota) hasClient(addr string) bool {
rlq.lock.RLock()
defer rlq.lock.RUnlock()
rlc, ok := rlq.rateQuotas[addr]
return ok && rlc != nil
}
func (rlq *RateLimitQuota) numClients() int {
rlq.lock.RLock()
defer rlq.lock.RUnlock()
return len(rlq.rateQuotas)
}
func (rlq *RateLimitQuota) getPurgeEnabled() bool {
rlq.lock.RLock()
defer rlq.lock.RUnlock()
return rlq.purgeEnabled
}
// quotaID returns the identifier of the quota rule
func (rlq *RateLimitQuota) quotaID() string {
return rlq.ID
@ -208,60 +164,6 @@ func (rlq *RateLimitQuota) QuotaName() string {
return rlq.Name
}
// purgeClientsLoop performs a blocking process where every purgeInterval
// duration, we look for stale clients to remove from the rateQuotas map.
// A ClientRateLimiter is considered stale if its lastSeen timestamp exceeds the
// current time. The loop will continue to run indefinitely until a value is
// sent on the closeCh in which we stop the ticker and exit.
func (rlq *RateLimitQuota) purgeClientsLoop() {
rlq.lock.RLock()
ticker := time.NewTicker(rlq.purgeInterval)
rlq.lock.RUnlock()
for {
select {
case t := <-ticker.C:
rlq.lock.Lock()
for client, crl := range rlq.rateQuotas {
if t.UTC().Sub(crl.lastSeen) >= rlq.staleAge {
delete(rlq.rateQuotas, client)
}
}
rlq.lock.Unlock()
case <-rlq.closeCh:
ticker.Stop()
rlq.lock.Lock()
rlq.purgeEnabled = false
rlq.lock.Unlock()
return
}
}
}
// clientRateLimiter returns a reference to a ClientRateLimiter based on a
// provided client address (e.g. IP address). If the ClientRateLimiter does not
// exist in the RateLimitQuota's mapping, one will be created and set. The
// created RateLimitQuota will have its requests-per-second set to
// RateLimitQuota.AverageRps. If the ClientRateLimiter already exists, the
// lastSeen timestamp will be updated.
func (rlq *RateLimitQuota) clientRateLimiter(addr string) *ClientRateLimiter {
rlq.lock.Lock()
defer rlq.lock.Unlock()
crl, ok := rlq.rateQuotas[addr]
if !ok {
limiter := newClientRateLimiter(rlq.Rate, -1)
rlq.rateQuotas[addr] = limiter
return limiter
}
crl.lastSeen = time.Now().UTC()
return crl
}
// allow decides if the request is allowed by the quota. An error will be
// returned if the request ID or address is empty. If the path is exempt, the
// quota will not be evaluated. Otherwise, the client rate limiter is retrieved
@ -279,8 +181,16 @@ func (rlq *RateLimitQuota) allow(req *Request) (Response, error) {
return resp, fmt.Errorf("missing request client address in quota request")
}
resp.Allowed = rlq.clientRateLimiter(req.ClientAddress).limiter.Allow()
limit, remaining, reset, allow := rlq.store.Take(req.ClientAddress)
resp.Allowed = allow
resp.Headers = map[string]string{
httplimit.HeaderRateLimitLimit: strconv.FormatUint(limit, 10),
httplimit.HeaderRateLimitRemaining: strconv.FormatUint(remaining, 10),
httplimit.HeaderRateLimitReset: time.Unix(0, int64(reset)).UTC().Format(time.RFC1123),
}
if !resp.Allowed {
resp.Headers[httplimit.HeaderRetryAfter] = resp.Headers[httplimit.HeaderRateLimitReset]
rlq.metricSink.IncrCounterWithLabels([]string{"quota", "rate_limit", "violation"}, 1, []metrics.Label{{"name", rlq.Name}})
}
@ -289,7 +199,10 @@ func (rlq *RateLimitQuota) allow(req *Request) (Response, error) {
// close stops the current running client purge loop.
func (rlq *RateLimitQuota) close() error {
close(rlq.closeCh)
if rlq.store != nil {
return rlq.store.Close()
}
return nil
}

View File

@ -10,65 +10,33 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
func TestNewClientRateLimiter(t *testing.T) {
func TestNewRateLimitQuota(t *testing.T) {
testCases := []struct {
maxRequests float64
burstSize int
expectedBurst int
name string
rlq *RateLimitQuota
expectErr bool
}{
{1000, -1, 1000},
{1000, 5000, 5000},
{16.1, -1, 17},
{16.7, -1, 17},
{16.7, 100, 100},
{"valid rate", NewRateLimitQuota("test-rate-limiter", "qa", "/foo/bar", 16.7, time.Second), false},
}
for _, tc := range testCases {
crl := newClientRateLimiter(tc.maxRequests, tc.burstSize)
b := crl.limiter.Burst()
if b != tc.expectedBurst {
t.Fatalf("unexpected burst size; expected: %d, got: %d", tc.expectedBurst, b)
}
}
}
tc := tc
func TestNewRateLimitQuota(t *testing.T) {
rlq := NewRateLimitQuota("test-rate-limiter", "qa", "/foo/bar", 16.7)
if err := rlq.initialize(logging.NewVaultLogger(log.Trace), metricsutil.BlackholeSink()); err != nil {
t.Fatal(err)
}
if !rlq.purgeEnabled {
t.Fatal("expected rate limit quota to start purge loop")
}
if rlq.purgeInterval != DefaultRateLimitPurgeInterval {
t.Fatalf("unexpected purgeInterval; expected: %d, got: %d", DefaultRateLimitPurgeInterval, rlq.purgeInterval)
}
if rlq.staleAge != DefaultRateLimitStaleAge {
t.Fatalf("unexpected staleAge; expected: %d, got: %d", DefaultRateLimitStaleAge, rlq.staleAge)
t.Run(tc.name, func(t *testing.T) {
err := tc.rlq.initialize(logging.NewVaultLogger(log.Trace), metricsutil.BlackholeSink())
require.Equal(t, tc.expectErr, err != nil, err)
})
}
}
func TestRateLimitQuota_Close(t *testing.T) {
rlq := NewRateLimitQuota("test-rate-limiter", "qa", "/foo/bar", 16.7)
if err := rlq.initialize(logging.NewVaultLogger(log.Trace), metricsutil.BlackholeSink()); err != nil {
t.Fatal(err)
}
if err := rlq.close(); err != nil {
t.Fatalf("unexpected error when closing: %v", err)
}
time.Sleep(time.Second) // allow enough time for purgeClientsLoop to receive on closeCh
if rlq.getPurgeEnabled() {
t.Fatal("expected client purging to be disabled after close")
}
rlq := NewRateLimitQuota("test-rate-limiter", "qa", "/foo/bar", 16.7, time.Second)
require.NoError(t, rlq.initialize(logging.NewVaultLogger(log.Trace), metricsutil.BlackholeSink()))
require.NoError(t, rlq.close())
}
func TestRateLimitQuota_Allow(t *testing.T) {
@ -78,17 +46,13 @@ func TestRateLimitQuota_Allow(t *testing.T) {
NamespacePath: "qa",
MountPath: "/foo/bar",
Rate: 16.7,
purgeEnabled: true, // to allow manual setting of purgeInterval and staleAge
// override values to lower durations for testing purposes
purgeInterval: 10 * time.Second,
staleAge: 10 * time.Second,
}
if err := rlq.initialize(logging.NewVaultLogger(log.Trace), metricsutil.BlackholeSink()); err != nil {
t.Fatal(err)
}
// override value and manually start purgeClientsLoop for testing purposes
rlq.purgeInterval = 10 * time.Second
rlq.staleAge = 10 * time.Second
go rlq.purgeClientsLoop()
require.NoError(t, rlq.initialize(logging.NewVaultLogger(log.Trace), metricsutil.BlackholeSink()))
var wg sync.WaitGroup
@ -132,18 +96,12 @@ func TestRateLimitQuota_Allow(t *testing.T) {
time.Sleep(2 * time.Millisecond)
}
}
wg.Wait()
if got, expected := len(results), rlq.numClients(); got != expected {
t.Fatalf("unexpected number of tracked client rate limit quotas; got %d, expected; %d", got, expected)
}
elapsed := time.Since(start)
// evaluate the ideal RPS as (ceil(RPS) + (RPS * totalSeconds))
elapsed := time.Since(start)
ideal := math.Ceil(rlq.Rate) + (rlq.Rate * float64(elapsed) / float64(time.Second))
for addr, cr := range results {
@ -151,22 +109,10 @@ func TestRateLimitQuota_Allow(t *testing.T) {
numFail := cr.atomicNumFail.Load()
// ensure there were some failed requests for the namespace
if numFail == 0 {
t.Fatalf("expected some requests to fail; addr: %s, numSuccess: %d, numFail: %d, elapsed: %d", addr, numAllow, numFail, elapsed)
}
require.NotZerof(t, numFail, "expected some requests to fail; addr: %s, numSuccess: %d, numFail: %d, elapsed: %d", addr, numAllow, numFail, elapsed)
// ensure that we should never get more requests than allowed for the namespace
if want := int32(ideal + 1); numAllow > want {
t.Fatalf("too many successful requests; addr: %s, want: %d, numSuccess: %d, numFail: %d, elapsed: %d", addr, want, numAllow, numFail, elapsed)
}
}
// allow enough time for the client to be purged
time.Sleep(rlq.purgeInterval * 2)
for addr := range results {
if rlq.hasClient(addr) {
t.Fatalf("expected stale client to be purged: %s", addr)
}
want := int32(ideal + 1)
require.Falsef(t, numAllow > want, "too many successful requests; addr: %s, want: %d, numSuccess: %d, numFail: %d, elapsed: %d", addr, want, numAllow, numFail, elapsed)
}
}

View File

@ -3,26 +3,23 @@ package quotas
import (
"context"
"testing"
"time"
"github.com/go-test/deep"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/stretchr/testify/require"
)
func TestQuotas_Precedence(t *testing.T) {
qm, err := NewManager(logging.NewVaultLogger(log.Trace), nil, metricsutil.BlackholeSink())
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
setQuotaFunc := func(t *testing.T, name, nsPath, mountPath string) Quota {
t.Helper()
quota := NewRateLimitQuota(name, nsPath, mountPath, 10)
err := qm.SetQuota(context.Background(), TypeRateLimit.String(), quota, true)
if err != nil {
t.Fatal(err)
}
quota := NewRateLimitQuota(name, nsPath, mountPath, 10, time.Second)
require.NoError(t, qm.SetQuota(context.Background(), TypeRateLimit.String(), quota, true))
return quota
}
@ -33,9 +30,8 @@ func TestQuotas_Precedence(t *testing.T) {
NamespacePath: nsPath,
MountPath: mountPath,
})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if diff := deep.Equal(expected, quota); len(diff) > 0 {
t.Fatal(diff)
}

202
vendor/github.com/sethvargo/go-limiter/LICENSE generated vendored Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

49
vendor/github.com/sethvargo/go-limiter/Makefile generated vendored Normal file
View File

@ -0,0 +1,49 @@
VETTERS = "asmdecl,assign,atomic,bools,buildtag,cgocall,composites,copylocks,errorsas,httpresponse,loopclosure,lostcancel,nilfunc,printf,shift,stdmethods,structtag,tests,unmarshal,unreachable,unsafeptr,unusedresult"
GOFMT_FILES = $(shell go list -f '{{.Dir}}' ./...)
benchmarks:
@(cd benchmarks/ && go test -bench=. -benchmem -benchtime=1s ./...)
.PHONY: benchmarks
fmtcheck:
@command -v goimports > /dev/null 2>&1 || (cd tools/ && go get golang.org/x/tools/cmd/goimports)
@CHANGES="$$(goimports -d $(GOFMT_FILES))"; \
if [ -n "$${CHANGES}" ]; then \
echo "Unformatted (run goimports -w .):\n\n$${CHANGES}\n\n"; \
exit 1; \
fi
@# Annoyingly, goimports does not support the simplify flag.
@CHANGES="$$(gofmt -s -d $(GOFMT_FILES))"; \
if [ -n "$${CHANGES}" ]; then \
echo "Unformatted (run gofmt -s -w .):\n\n$${CHANGES}\n\n"; \
exit 1; \
fi
.PHONY: fmtcheck
spellcheck:
@command -v misspell > /dev/null 2>&1 || (cd tools/ && go get github.com/client9/misspell/cmd/misspell)
@misspell -locale="US" -error -source="text" **/*
.PHONY: spellcheck
staticcheck:
@command -v staticcheck > /dev/null 2>&1 || (cd tools/ && go get honnef.co/go/tools/cmd/staticcheck)
@staticcheck -checks="all" -tests $(GOFMT_FILES)
.PHONY: staticcheck
test:
@go test \
-count=1 \
-short \
-timeout=5m \
-vet="${VETTERS}" \
./...
.PHONY: test
test-acc:
@go test \
-count=1 \
-race \
-timeout=10m \
-vet="${VETTERS}" \
./...
.PHONY: test-acc

150
vendor/github.com/sethvargo/go-limiter/README.md generated vendored Normal file
View File

@ -0,0 +1,150 @@
# Go Rate Limiter
[![GoDoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://pkg.go.dev/mod/github.com/sethvargo/go-limiter)
[![GitHub Actions](https://img.shields.io/github/workflow/status/sethvargo/go-limiter/Test?style=flat-square)](https://github.com/sethvargo/go-limiter/actions?query=workflow%3ATest)
This package provides a rate limiter in Go (Golang), suitable for use in HTTP
servers and distributed workloads. It's specifically designed for
configurability and flexibility without compromising throughput.
## Usage
1. Create a store. This example uses an in-memory store:
```golang
store, err := memorystore.New(&memorystore.Config{
// Number of tokens allowed per interval.
Tokens: 15,
// Interval until tokens reset.
Interval: time.Minute,
})
if err != nil {
log.Fatal(err)
}
```
1. Determine the limit by calling `Take()` on the store:
```golang
// key is the unique value upon which you want to rate limit, like an IP or
// MAC address.
key := "127.0.0.1"
limit, remaining, reset, ok := store.Take(key)
// limit is the configured limit (15 in this example).
_ = limit
// remaining is the number of tokens remaining (14 now).
_ = remaining
// reset is the unix nanoseconds at which the tokens will replenish.
_ = reset
// ok indicates whether the take was successful. If the key is over the
// configured limit, ok will be false.
_ = ok
// Here's a more realistic example:
if !ok {
return fmt.Errorf("rate limited: retry at %v", reset)
}
```
There's also HTTP middleware via the `httplimit` package. After creating a
store, wrap Go's standard HTTP handler:
```golang
middleware, err := httplimit.NewMiddleware(store, httplimit.IPKeyFunc())
if err != nil {
log.Fatal(err)
}
mux1 := http.NewServeMux()
mux1.Handle("/", middleware.Handle(doWork)) // doWork is your original handler
```
The middleware automatically set the following headers, conforming to the latest
RFCs:
- `X-RateLimit-Limit` - configured rate limit (constant).
- `X-RateLimit-Remaining` - number of remaining tokens in current interval.
- `X-RateLimit-Reset` - UTC time when the limit resets.
- `Retry-After` - Time at which to retry
## Why _another_ Go rate limiter?
I really wanted to learn more about the topic and possibly implementations. The
existing packages in the Go ecosystem either lacked flexibility or traded
flexibility for performance. I wanted to write a package that was highly
extensible while still offering the highest levels of performance.
### Speed and performance
How fast is it? You can run the benchmarks yourself, but here's a few sample
benchmarks with 100,000 unique keys. I added commas to the output for clarity,
but you can run the benchmarks via `make benchmarks`:
```text
$ make benchmarks
BenchmarkSethVargoMemory/memory/serial-7 13,706,899 81.7 ns/op 16 B/op 1 allocs/op
BenchmarkSethVargoMemory/memory/parallel-7 7,900,639 151 ns/op 61 B/op 3 allocs/op
BenchmarkSethVargoMemory/sweep/serial-7 19,601,592 58.3 ns/op 0 B/op 0 allocs/op
BenchmarkSethVargoMemory/sweep/parallel-7 21,042,513 55.2 ns/op 0 B/op 0 allocs/op
BenchmarkThrottled/memory/serial-7 6,503,260 176 ns/op 0 B/op 0 allocs/op
BenchmarkThrottled/memory/parallel-7 3,936,655 297 ns/op 0 B/op 0 allocs/op
BenchmarkThrottled/sweep/serial-7 6,901,432 171 ns/op 0 B/op 0 allocs/op
BenchmarkThrottled/sweep/parallel-7 5,948,437 202 ns/op 0 B/op 0 allocs/op
BenchmarkTollbooth/memory/serial-7 3,064,309 368 ns/op 0 B/op 0 allocs/op
BenchmarkTollbooth/memory/parallel-7 2,658,014 448 ns/op 0 B/op 0 allocs/op
BenchmarkTollbooth/sweep/serial-7 2,769,937 430 ns/op 192 B/op 3 allocs/op
BenchmarkTollbooth/sweep/parallel-7 2,216,211 546 ns/op 192 B/op 3 allocs/op
BenchmarkUber/memory/serial-7 13,795,612 94.2 ns/op 0 B/op 0 allocs/op
BenchmarkUber/memory/parallel-7 7,503,214 159 ns/op 0 B/op 0 allocs/op
BenchmarkUlule/memory/serial-7 2,964,438 405 ns/op 24 B/op 2 allocs/op
BenchmarkUlule/memory/parallel-7 2,441,778 469 ns/op 24 B/op 2 allocs/op
```
There's likely still optimizations to be had, pull requests are welcome!
### Ecosystem
Many of the existing packages in the ecosystem take dependencies on other
packages. I'm an advocate of very thin libraries, and I don't think a rate
limiter should be pulling external packages. That's why **go-limit uses only the
Go standard library**.
### Flexible and extensible
Most of the existing rate limiting libraries make a strong assumption that rate
limiting is only for HTTP services. Baked in that assumption are more
assumptions like rate limiting by "IP address" or are limited to a resolution of
"per second". While go-limit supports rate limiting at the HTTP layer, it can
also be used to rate limit literally anything. It rate limits on a user-defined
arbitrary string key.
### Stores
#### Memory
Memory is the fastest store, but only works on a single container/virtual
machine since there's no way to share the state.
[Learn more](https://pkg.go.dev/github.com/sethvargo/go-limiter/memorystore).
#### Redis
Redis uses Redis + Lua as a shared pool, but comes at a performance cost.
[Learn more](https://pkg.go.dev/github.com/sethvargo/go-limiter/redisstore).
#### Noop
Noop does no rate limiting, but still implements the interface - useful for
testing and local development.
[Learn more](https://pkg.go.dev/github.com/sethvargo/go-limiter/noopstore).

3
vendor/github.com/sethvargo/go-limiter/go.mod generated vendored Normal file
View File

@ -0,0 +1,3 @@
module github.com/sethvargo/go-limiter
go 1.14

0
vendor/github.com/sethvargo/go-limiter/go.sum generated vendored Normal file
View File

View File

@ -0,0 +1,120 @@
// Package httplimit provides middleware for rate limiting HTTP handlers.
//
// The implementation is designed to work with Go's built-in http.Handler and
// http.HandlerFunc interfaces, so it will also work with any popular web
// frameworks that support middleware with these properties.
package httplimit
import (
"fmt"
"net"
"net/http"
"strconv"
"time"
"github.com/sethvargo/go-limiter"
)
const (
// HeaderRateLimitLimit, HeaderRateLimitRemaining, and HeaderRateLimitReset
// are the recommended return header values from IETF on rate limiting. Reset
// is in UTC time.
HeaderRateLimitLimit = "X-RateLimit-Limit"
HeaderRateLimitRemaining = "X-RateLimit-Remaining"
HeaderRateLimitReset = "X-RateLimit-Reset"
// HeaderRetryAfter is the header used to indicate when a client should retry
// requests (when the rate limit expires), in UTC time.
HeaderRetryAfter = "Retry-After"
)
// KeyFunc is a function that accepts an http request and returns a string key
// that uniquely identifies this request for the purpose of rate limiting.
//
// KeyFuncs are called on each request, so be mindful of performance and
// implement caching where possible. If a KeyFunc returns an error, the HTTP
// handler will return Internal Server Error and will NOT take from the limiter
// store.
type KeyFunc func(r *http.Request) (string, error)
// IPKeyFunc returns a function that keys data based on the incoming requests IP
// address. By default this uses the RemoteAddr, but you can also specify a list
// of headers which will be checked for an IP address first (e.g.
// "X-Forwarded-For"). Headers are retrieved using Header.Get(), which means
// they are case insensitive.
func IPKeyFunc(headers ...string) KeyFunc {
return func(r *http.Request) (string, error) {
for _, h := range headers {
if v := r.Header.Get(h); v != "" {
return v, nil
}
}
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return "", err
}
return ip, nil
}
}
// Middleware is a handler/mux that can wrap other middlware to implement HTTP
// rate limiting. It can rate limit based on an arbitrary KeyFunc, and supports
// anything that implements limiter.Store.
type Middleware struct {
store limiter.Store
keyFunc KeyFunc
}
// NewMiddleware creates a new middleware suitable for use as an HTTP handler.
// This function returns an error if either the Store or KeyFunc are nil.
func NewMiddleware(s limiter.Store, f KeyFunc) (*Middleware, error) {
if s == nil {
return nil, fmt.Errorf("store cannot be nil")
}
if f == nil {
return nil, fmt.Errorf("key function cannot be nil")
}
return &Middleware{
store: s,
keyFunc: f,
}, nil
}
// Handle returns the HTTP handler as a middleware. This handler calls Take() on
// the store and sets the common rate limiting headers. If the take is
// successful, the remaining middleware is called. If take is unsuccessful, the
// middleware chain is halted and the function renders a 429 to the caller with
// metadata about when it's safe to retry.
func (m *Middleware) Handle(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Call the key function - if this fails, it's an internal server error.
key, err := m.keyFunc(r)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
// Take from the store.
limit, remaining, reset, ok := m.store.Take(key)
resetTime := time.Unix(0, int64(reset)).UTC().Format(time.RFC1123)
// Set headers (we do this regardless of whether the request is permitted).
w.Header().Set(HeaderRateLimitLimit, strconv.FormatUint(limit, 10))
w.Header().Set(HeaderRateLimitRemaining, strconv.FormatUint(remaining, 10))
w.Header().Set(HeaderRateLimitReset, resetTime)
// Fail if there were no tokens remaining.
if !ok {
w.Header().Set(HeaderRetryAfter, resetTime)
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
// If we got this far, we're allowed to continue, so call the next middleware
// in the stack to continue processing.
next.ServeHTTP(w, r)
})
}

2
vendor/github.com/sethvargo/go-limiter/limiter.go generated vendored Normal file
View File

@ -0,0 +1,2 @@
// Package limiter defines rate limiting systems.
package limiter

View File

@ -0,0 +1,326 @@
// Package memorystore defines an in-memory storage system for limiting.
package memorystore
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/sethvargo/go-limiter"
)
var _ limiter.Store = (*store)(nil)
type store struct {
tokens uint64
interval time.Duration
rate float64
sweepInterval time.Duration
sweepMinTTL uint64
data map[string]*bucket
dataLock sync.RWMutex
stopped uint32
stopCh chan struct{}
}
// Config is used as input to New. It defines the behavior of the storage
// system.
type Config struct {
// Tokens is the number of tokens to allow per interval. The default value is
// 1.
Tokens uint64
// Interval is the time interval upon which to enforce rate limiting. The
// default value is 1 second.
Interval time.Duration
// SweepInterval is the rate at which to run the garabage collection on stale
// entries. Setting this to a low value will optimize memory consumption, but
// will likely reduce performance and increase lock contention. Setting this
// to a high value will maximum throughput, but will increase the memory
// footprint. This can be tuned in combination with SweepMinTTL to control how
// long stale entires are kept. The default value is 6 hours.
SweepInterval time.Duration
// SweepMinTTL is the minimum amount of time a session must be inactive before
// clearing it from the entries. There's no validation, but this should be at
// least as high as your rate limit, or else the data store will purge records
// before they limit is applied. The default value is 12 hours.
SweepMinTTL time.Duration
// InitialAlloc is the size to use for the in-memory map. Go will
// automatically expand the buffer, but choosing higher number can trade
// memory consumption for performance as it limits the number of times the map
// needs to expand. The default value is 4096.
InitialAlloc int
}
// New creates an in-memory rate limiter that uses a bucketing model to limit
// the number of permitted events over an interval. It's optimized for runtime
// and memory efficiency.
func New(c *Config) (limiter.Store, error) {
if c == nil {
c = new(Config)
}
tokens := uint64(1)
if c.Tokens > 0 {
tokens = c.Tokens
}
interval := 1 * time.Second
if c.Interval > 0 {
interval = c.Interval
}
sweepInterval := 6 * time.Hour
if c.SweepInterval > 0 {
sweepInterval = c.SweepInterval
}
sweepMinTTL := 12 * time.Hour
if c.SweepMinTTL > 0 {
sweepMinTTL = c.SweepMinTTL
}
initialAlloc := 4096
if c.InitialAlloc > 0 {
initialAlloc = c.InitialAlloc
}
s := &store{
tokens: tokens,
interval: interval,
rate: float64(interval) / float64(tokens),
sweepInterval: sweepInterval,
sweepMinTTL: uint64(sweepMinTTL),
data: make(map[string]*bucket, initialAlloc),
stopCh: make(chan struct{}),
}
go s.purge()
return s, nil
}
// Take attempts to remove a token from the named key. If the take is
// successful, it returns true, otherwise false. It also returns the configured
// limit, remaining tokens, and reset time.
func (s *store) Take(key string) (uint64, uint64, uint64, bool) {
// If the store is stopped, all requests are rejected.
if atomic.LoadUint32(&s.stopped) == 1 {
return 0, 0, 0, false
}
// Acquire a read lock first - this allows other to concurrently check limits
// without taking a full lock.
s.dataLock.RLock()
if b, ok := s.data[key]; ok {
s.dataLock.RUnlock()
return b.take()
}
s.dataLock.RUnlock()
// Unfortunately we did not find the key in the map. Take out a full lock. We
// have to check if the key exists again, because it's possible another
// goroutine created it between our shared lock and exclusive lock.
s.dataLock.Lock()
if b, ok := s.data[key]; ok {
s.dataLock.Unlock()
return b.take()
}
// This is the first time we've seen this entry (or it's been garbage
// collected), so create the bucket and take an initial request.
b := newBucket(s.tokens, s.interval, s.rate)
// Add it to the map and take.
s.data[key] = b
s.dataLock.Unlock()
return b.take()
}
// Close stops the memory limiter and cleans up any outstanding sessions. You
// should absolutely always call Close() as it releases the memory consumed by
// the map AND releases the tickers.
func (s *store) Close() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
// Close the channel to prevent future purging.
close(s.stopCh)
// Delete all the things.
s.dataLock.Lock()
for k := range s.data {
delete(s.data, k)
}
s.dataLock.Unlock()
return nil
}
// purge continually iterates over the map and purges old values on the provided
// sweep interval. Earlier designs used a go-function-per-item expiration, but
// it actually generated *more* lock contention under normal use. The most
// performant option with real-world data was a global garbage collection on a
// fixed interval.
func (s *store) purge() {
ticker := time.NewTicker(s.sweepInterval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
}
s.dataLock.Lock()
now := fastnow()
for k, b := range s.data {
lastTick := (*bucketState)(atomic.LoadPointer(&b.bucketState)).lastTick
lastTime := b.startTime + (lastTick * uint64(b.interval))
if now-lastTime > s.sweepMinTTL {
delete(s.data, k)
}
}
s.dataLock.Unlock()
}
}
// bucket is an internal wrapper around a taker.
type bucket struct {
// startTime is the number of nanoseconds from unix epoch when this bucket was
// initially created.
startTime uint64
// maxTokens is the maximum number of tokens permitted on the bucket at any
// time. The number of available tokens will never exceed this value.
maxTokens uint64
// interval is the time at which ticking should occur.
interval time.Duration
// bucketState is the mutable internal state of the event. It includes the
// current number of available tokens and the last time the clock ticked. It
// should always be loaded with atomic as it is not concurrent safe.
bucketState unsafe.Pointer
// fillRate is the number of tokens to add per nanosecond. It is calculated
// based on the provided maxTokens and interval.
fillRate float64
}
// bucketState represents the internal bucket state.
type bucketState struct {
// availableTokens is the current point-in-time number of tokens remaining.
// This value changes frequently and must be guarded by an atomic read/write.
availableTokens uint64
// lastTick is the last clock tick, used to re-calculate the number of tokens
// on the bucket.
lastTick uint64
}
// newBucket creates a new bucket from the given tokens and interval.
func newBucket(tokens uint64, interval time.Duration, rate float64) *bucket {
b := &bucket{
startTime: fastnow(),
maxTokens: tokens,
interval: interval,
fillRate: rate,
bucketState: unsafe.Pointer(&bucketState{
availableTokens: tokens,
}),
}
return b
}
// take attempts to remove a token from the bucket. If there are no tokens
// available and the clock has ticked forward, it recalculates the number of
// tokens and retries. It returns the limit, remaining tokens, time until
// refresh, and whether the take was successful.
func (b *bucket) take() (uint64, uint64, uint64, bool) {
// Capture the current request time, current tick, and amount of time until
// the bucket resets.
now := fastnow()
currTick := tick(b.startTime, now, b.interval)
next := b.startTime + ((currTick + 1) * uint64(b.interval))
for {
curr := atomic.LoadPointer(&b.bucketState)
currState := (*bucketState)(curr)
lastTick := currState.lastTick
tokens := currState.availableTokens
if lastTick < currTick {
tokens = availableTokens(currState.lastTick, currTick, b.maxTokens, b.fillRate)
lastTick = currTick
if !atomic.CompareAndSwapPointer(&b.bucketState, curr, unsafe.Pointer(&bucketState{
availableTokens: tokens,
lastTick: lastTick,
})) {
// Someone else modified the value
continue
}
}
if tokens > 0 {
tokens--
if !atomic.CompareAndSwapPointer(&b.bucketState, curr, unsafe.Pointer(&bucketState{
availableTokens: tokens,
lastTick: lastTick,
})) {
// There were tokens left, but someone took them :(
continue
}
return b.maxTokens, tokens, next, true
}
// Returning the TTL until next tick.
return b.maxTokens, 0, next, false
}
}
// availableTokens returns the number of available tokens, up to max, between
// the two ticks.
func availableTokens(last, curr, max uint64, fillRate float64) uint64 {
delta := curr - last
available := uint64(float64(delta) * fillRate)
if available > max {
available = max
}
return available
}
// tick is the total number of times the current interval has occurred between
// when the time started (start) and the current time (curr). For example, if
// the start time was 12:30pm and it's currently 1:00pm, and the interval was 5
// minutes, tick would return 6 because 1:00pm is the 6th 5-minute tick. Note
// that tick would return 5 at 12:59pm, because it hasn't reached the 6th tick
// yet.
func tick(start, curr uint64, interval time.Duration) uint64 {
return (curr - start) / uint64(interval)
}
//go:noescape
//go:linkname walltime runtime.walltime
func walltime() (int64, int32)
// fastnow returns a monotonic clock value. The actual value will differ across
// systems, but that's okay because we generally only care about the deltas.
func fastnow() uint64 {
x, y := walltime()
return uint64(x)*1e9 + uint64(y)
}

33
vendor/github.com/sethvargo/go-limiter/store.go generated vendored Normal file
View File

@ -0,0 +1,33 @@
package limiter
import "io"
// Store is an interface for limiter storage backends.
//
// Keys should be hash, sanitized, or otherwise scrubbed of identifiable
// information they will be given to the store in plaintext. If you're rate
// limiting by IP address, for example, the IP address would be stored in the
// storage system in plaintext. This may be undesirable in certain situations,
// like when the store is a public database. In those cases, you should hash or
// HMAC the key before passing giving it to the store. If you want to encrypt
// the value, you must use homomorphic encryption to ensure the value always
// encrypts to the same ciphertext.
type Store interface {
// Take takes a token from the given key if available, returning:
//
// - the configured limit size
// - the number of remaining tokens in the interval
// - the server time when new tokens will be available
// - whether the take was successful
//
// If "ok" is false, the take was unsuccessful and the caller should NOT
// service the request.
//
// See the note about keys on the interface documentation.
Take(key string) (limit, remaining, reset uint64, ok bool)
// Close terminates the store and cleans up any data structures or connections
// that may remain open. After a store is stopped, Take() should always return
// zero values.
io.Closer
}

4
vendor/modules.txt vendored
View File

@ -791,6 +791,10 @@ github.com/samuel/go-zookeeper/zk
github.com/sasha-s/go-deadlock
# github.com/satori/go.uuid v1.2.0
github.com/satori/go.uuid
# github.com/sethvargo/go-limiter v0.2.3
github.com/sethvargo/go-limiter
github.com/sethvargo/go-limiter/httplimit
github.com/sethvargo/go-limiter/memorystore
# github.com/shirou/gopsutil v2.20.6-0.20200630091542-01afd763e6c0+incompatible
github.com/shirou/gopsutil/cpu
github.com/shirou/gopsutil/disk

View File

@ -19,12 +19,15 @@ The `/sys/quotas/config` endpoint is used to configure rate limit quotas.
- `enable_rate_limit_audit_logging` `(bool: false)` - If set, starts audit logging
of requests that get rejected due to rate limit quota rule violations.
- `enable_rate_limit_response_headers` `(bool: false)` - If set, additional rate
limit quota HTTP headers will be added to responses.
### Sample Payload
```json
{
"enable_rate_limit_audit_logging": true,
"enable_rate_limit_response_headers": true,
}
```
@ -62,7 +65,8 @@ $ curl \
"lease_duration": 0,
"renewable": false,
"data": {
"enable_rate_limit_audit_logging": false
"enable_rate_limit_audit_logging": false,
"enable_rate_limit_response_headers": false
},
"warnings": null
}

View File

@ -29,15 +29,17 @@ either be a namespace or mount.
"moving" effects. For example, updating `auth/userpass` to
`namespace1/auth/userpass` moves this quota from being a global mount quota to a
namespace specific mount quota. **Note, namespaces are supported in Enterprise only**.
- `rate` `(float: 0.0)` - The maximum number of requests at any given second to
- `rate` `(float: 0.0)` - The maximum number of requests in a given interval to
be allowed by the quota rule. The `rate` must be positive.
- `interval` `(string: "")` - The duration to enforce rate limiting for (default `"1s"`).
### Sample Payload
```json
{
"path": "",
"rate": 897.3
"rate": 897.3,
"interval": "2m"
}
```
@ -94,6 +96,7 @@ $ curl \
"lease_duration": 0,
"renewable": false,
"data": {
"interval": "2m0s",
"name": "global-rate-limiter",
"path": "",
"rate": 897.3,