vault/sdk/plugin/grpc_backend_server.go
Mike Palmiotto 43be9fc18a
Request Limiter (#25093)
This commit introduces two new adaptive concurrency limiters in Vault,
which should handle overloading of the server during periods of
untenable request rate. The limiter adjusts the number of allowable
in-flight requests based on latency measurements performed across the
request duration. This approach allows us to reject entire requests
prior to doing any work and prevents clients from exceeding server
capacity.

The limiters intentionally target two separate vectors that have been
proven to lead to server over-utilization.

- Back pressure from the storage backend, resulting in bufferbloat in
  the WAL system. (enterprise)
- Back pressure from CPU over-utilization via PKI issue requests
  (specifically for RSA keys), resulting in failed heartbeats.

Storage constraints can be accounted for by limiting logical requests
according to their http.Method. We only limit requests with write-based
methods, since these will result in storage Puts and exhibit the
aforementioned bufferbloat.

CPU constraints are accounted for using the same underlying library and
technique; however, they require special treatment. The maximum number
of concurrent pki/issue requests found in testing (again, specifically
for RSA keys) is far lower than the minimum tolerable write request
rate. Without separate limiting, we would artificially impose limits on
tolerable request rates for non-PKI requests. To specifically target PKI
issue requests, we add a new PathsSpecial field, called limited,
allowing backends to specify a list of paths which should get
special-case request limiting.

For the sake of code cleanliness and future extensibility, we introduce
the concept of a LimiterRegistry. The registry proposed in this PR has
two entries, corresponding with the two vectors above. Each Limiter
entry has its own corresponding maximum and minimum concurrency,
allowing them to react to latency deviation independently and handle
high volumes of requests to targeted bottlenecks (CPU and storage).

In both cases, utilization will be effectively throttled before Vault
reaches any degraded state. The resulting 503 - Service Unavailable is a
retryable HTTP response code, which can be handled to gracefully retry
and eventually succeed. Clients should handle this by retrying with
jitter and exponential backoff. This is done within Vault's API, using
the go-retryablehttp library.

Limiter testing was performed via benchmarks of mixed workloads and
across a deployment of agent pods with great success.
2024-01-26 14:26:21 -05:00

298 lines
7.8 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package plugin
import (
"context"
"errors"
"fmt"
"sync"
"github.com/hashicorp/go-plugin"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/plugin/pb"
"google.golang.org/grpc"
)
var ErrServerInMetadataMode = errors.New("plugin server can not perform action while in metadata mode")
// singleImplementationID is the string used to define the instance ID of a
// non-multiplexed plugin
const singleImplementationID string = "single"
type backendInstance struct {
brokeredClient *grpc.ClientConn
backend logical.Backend
}
type backendGRPCPluginServer struct {
pb.UnimplementedBackendServer
logical.UnimplementedPluginVersionServer
broker *plugin.GRPCBroker
instances map[string]backendInstance
instancesLock sync.RWMutex
multiplexingSupport bool
factory logical.Factory
logger log.Logger
}
// getBackendAndBrokeredClientInternal returns the backend and client
// connection but does not hold a lock
func (b *backendGRPCPluginServer) getBackendAndBrokeredClientInternal(ctx context.Context) (logical.Backend, *grpc.ClientConn, error) {
if b.multiplexingSupport {
id, err := pluginutil.GetMultiplexIDFromContext(ctx)
if err != nil {
return nil, nil, err
}
if inst, ok := b.instances[id]; ok {
return inst.backend, inst.brokeredClient, nil
}
}
if singleImpl, ok := b.instances[singleImplementationID]; ok {
return singleImpl.backend, singleImpl.brokeredClient, nil
}
return nil, nil, fmt.Errorf("no backend instance found")
}
// getBackendAndBrokeredClient holds a read lock and returns the backend and
// client connection
func (b *backendGRPCPluginServer) getBackendAndBrokeredClient(ctx context.Context) (logical.Backend, *grpc.ClientConn, error) {
b.instancesLock.RLock()
defer b.instancesLock.RUnlock()
return b.getBackendAndBrokeredClientInternal(ctx)
}
// Setup dials into the plugin's broker to get a shimmed storage, logger, and
// system view of the backend. This method also instantiates the underlying
// backend through its factory func for the server side of the plugin.
func (b *backendGRPCPluginServer) Setup(ctx context.Context, args *pb.SetupArgs) (*pb.SetupReply, error) {
var err error
id := singleImplementationID
if b.multiplexingSupport {
id, err = pluginutil.GetMultiplexIDFromContext(ctx)
if err != nil {
return &pb.SetupReply{}, err
}
}
// Dial for storage
brokeredClient, err := b.broker.Dial(args.BrokerID)
if err != nil {
return &pb.SetupReply{}, err
}
storage := newGRPCStorageClient(brokeredClient)
sysView := newGRPCSystemView(brokeredClient)
events := newGRPCEventsClient(brokeredClient)
config := &logical.BackendConfig{
StorageView: storage,
Logger: b.logger,
System: sysView,
Config: args.Config,
BackendUUID: args.BackendUUID,
EventsSender: events,
}
// Call the underlying backend factory after shims have been created
// to set b.backend
backend, err := b.factory(ctx, config)
if err != nil {
return &pb.SetupReply{
Err: pb.ErrToString(err),
}, nil
}
b.instancesLock.Lock()
defer b.instancesLock.Unlock()
b.instances[id] = backendInstance{
brokeredClient: brokeredClient,
backend: backend,
}
return &pb.SetupReply{}, nil
}
func (b *backendGRPCPluginServer) HandleRequest(ctx context.Context, args *pb.HandleRequestArgs) (*pb.HandleRequestReply, error) {
backend, brokeredClient, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.HandleRequestReply{}, err
}
if pluginutil.InMetadataMode() {
return &pb.HandleRequestReply{}, ErrServerInMetadataMode
}
logicalReq, err := pb.ProtoRequestToLogicalRequest(args.Request)
if err != nil {
return &pb.HandleRequestReply{}, err
}
logicalReq.Storage = newGRPCStorageClient(brokeredClient)
resp, respErr := backend.HandleRequest(ctx, logicalReq)
pbResp, err := pb.LogicalResponseToProtoResponse(resp)
if err != nil {
return &pb.HandleRequestReply{}, err
}
return &pb.HandleRequestReply{
Response: pbResp,
Err: pb.ErrToProtoErr(respErr),
}, nil
}
func (b *backendGRPCPluginServer) Initialize(ctx context.Context, _ *pb.InitializeArgs) (*pb.InitializeReply, error) {
backend, brokeredClient, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.InitializeReply{}, err
}
if pluginutil.InMetadataMode() {
return &pb.InitializeReply{}, ErrServerInMetadataMode
}
req := &logical.InitializationRequest{
Storage: newGRPCStorageClient(brokeredClient),
}
respErr := backend.Initialize(ctx, req)
return &pb.InitializeReply{
Err: pb.ErrToProtoErr(respErr),
}, nil
}
func (b *backendGRPCPluginServer) SpecialPaths(ctx context.Context, args *pb.Empty) (*pb.SpecialPathsReply, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.SpecialPathsReply{}, err
}
paths := backend.SpecialPaths()
if paths == nil {
return &pb.SpecialPathsReply{
Paths: nil,
}, nil
}
return &pb.SpecialPathsReply{
Paths: &pb.Paths{
Root: paths.Root,
Unauthenticated: paths.Unauthenticated,
LocalStorage: paths.LocalStorage,
SealWrapStorage: paths.SealWrapStorage,
WriteForwardedStorage: paths.WriteForwardedStorage,
Binary: paths.Binary,
Limited: paths.Limited,
},
}, nil
}
func (b *backendGRPCPluginServer) HandleExistenceCheck(ctx context.Context, args *pb.HandleExistenceCheckArgs) (*pb.HandleExistenceCheckReply, error) {
backend, brokeredClient, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.HandleExistenceCheckReply{}, err
}
if pluginutil.InMetadataMode() {
return &pb.HandleExistenceCheckReply{}, ErrServerInMetadataMode
}
logicalReq, err := pb.ProtoRequestToLogicalRequest(args.Request)
if err != nil {
return &pb.HandleExistenceCheckReply{}, err
}
logicalReq.Storage = newGRPCStorageClient(brokeredClient)
checkFound, exists, err := backend.HandleExistenceCheck(ctx, logicalReq)
return &pb.HandleExistenceCheckReply{
CheckFound: checkFound,
Exists: exists,
Err: pb.ErrToProtoErr(err),
}, nil
}
func (b *backendGRPCPluginServer) Cleanup(ctx context.Context, _ *pb.Empty) (*pb.Empty, error) {
b.instancesLock.Lock()
defer b.instancesLock.Unlock()
backend, brokeredClient, err := b.getBackendAndBrokeredClientInternal(ctx)
if err != nil {
return &pb.Empty{}, err
}
backend.Cleanup(ctx)
// Close rpc clients
brokeredClient.Close()
if b.multiplexingSupport {
id, err := pluginutil.GetMultiplexIDFromContext(ctx)
if err != nil {
return nil, err
}
delete(b.instances, id)
} else if _, ok := b.instances[singleImplementationID]; ok {
delete(b.instances, singleImplementationID)
}
return &pb.Empty{}, nil
}
func (b *backendGRPCPluginServer) InvalidateKey(ctx context.Context, args *pb.InvalidateKeyArgs) (*pb.Empty, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.Empty{}, err
}
if pluginutil.InMetadataMode() {
return &pb.Empty{}, ErrServerInMetadataMode
}
backend.InvalidateKey(ctx, args.Key)
return &pb.Empty{}, nil
}
func (b *backendGRPCPluginServer) Type(ctx context.Context, _ *pb.Empty) (*pb.TypeReply, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.TypeReply{}, err
}
return &pb.TypeReply{
Type: uint32(backend.Type()),
}, nil
}
func (b *backendGRPCPluginServer) Version(ctx context.Context, _ *logical.Empty) (*logical.VersionReply, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &logical.VersionReply{}, err
}
if versioner, ok := backend.(logical.PluginVersioner); ok {
return &logical.VersionReply{
PluginVersion: versioner.PluginVersion().Version,
}, nil
}
return &logical.VersionReply{
PluginVersion: "",
}, nil
}