vault/sdk/plugin/grpc_backend_server.go
John-Michael Faircloth 07927e036c
feature: secrets/auth plugin multiplexing (#14946)
* enable registering backend muxed plugins in plugin catalog

* set the sysview on the pluginconfig to allow enabling secrets/auth plugins

* store backend instances in map

* store single implementations in the instances map

cleanup instance map and ensure we don't deadlock

* fix system backend unit tests

move GetMultiplexIDFromContext to pluginutil package

fix pluginutil test

fix dbplugin ut

* return error(s) if we can't get the plugin client

update comments

* refactor/move GetMultiplexIDFromContext test

* add changelog

* remove unnecessary field on pluginClient

* add unit tests to PluginCatalog for secrets/auth plugins

* fix comment

* return pluginClient from TestRunTestPlugin

* add multiplexed backend test

* honor metadatamode value in newbackend pluginconfig

* check that connection exists on cleanup

* add automtls to secrets/auth plugins

* don't remove apiclientmeta parsing

* use formatting directive for fmt.Errorf

* fix ut: remove tls provider func

* remove tlsproviderfunc from backend plugin tests

* use env var to prevent test plugin from running as a unit test

* WIP: remove lazy loading

* move non lazy loaded backend to new package

* use version wrapper for backend plugin factory

* remove backendVersionWrapper type

* implement getBackendPluginType for plugin catalog

* handle backend plugin v4 registration

* add plugin automtls env guard

* modify plugin factory to determine the backend to use

* remove old pluginsets from v5 and log pid in plugin catalog

* add reload mechanism via context

* readd v3 and v4 to pluginset

* call cleanup from reload if non-muxed

* move v5 backend code to new package

* use context reload for for ErrPluginShutdown case

* add wrapper on v5 backend

* fix run config UTs

* fix unit tests

- use v4/v5 mapping for plugin versions
- fix test build err
- add reload method on fakePluginClient
- add multiplexed cases for integration tests

* remove comment and update AutoMTLS field in test

* remove comment

* remove errwrap and unused context

* only support metadatamode false for v5 backend plugins

* update plugin catalog errors

* use const for env variables

* rename locks and remove unused

* remove unneeded nil check

* improvements based on staticcheck recommendations

* use const for single implementation string

* use const for context key

* use info default log level

* move pid to pluginClient struct

* remove v3 and v4 from multiplexed plugin set

* return from reload when non-multiplexed

* update automtls env string

* combine getBackend and getBrokeredClient

* update comments for plugin reload, Backend return val and log

* revert Backend return type

* allow non-muxed plugins to serve v5

* move v5 code to existing sdk plugin package

* do next export sdk fields now that we have removed extra plugin pkg

* set TLSProvider in ServeMultiplex for backwards compat

* use bool to flag multiplexing support on grpc backend server

* revert userpass main.go

* refactor plugin sdk

- update comments
- make use of multiplexing boolean and single implementation ID const

* update comment and use multierr

* attempt v4 if dispense fails on getPluginTypeForUnknown

* update comments on sdk plugin backend
2022-08-29 21:42:26 -05:00

269 lines
7.0 KiB
Go

package plugin
import (
"context"
"errors"
"fmt"
"sync"
log "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/plugin/pb"
"google.golang.org/grpc"
)
var ErrServerInMetadataMode = errors.New("plugin server can not perform action while in metadata mode")
// singleImplementationID is the string used to define the instance ID of a
// non-multiplexed plugin
const singleImplementationID string = "single"
type backendInstance struct {
brokeredClient *grpc.ClientConn
backend logical.Backend
}
type backendGRPCPluginServer struct {
pb.UnimplementedBackendServer
broker *plugin.GRPCBroker
instances map[string]backendInstance
instancesLock sync.RWMutex
multiplexingSupport bool
factory logical.Factory
logger log.Logger
}
// getBackendAndBrokeredClientInternal returns the backend and client
// connection but does not hold a lock
func (b *backendGRPCPluginServer) getBackendAndBrokeredClientInternal(ctx context.Context) (logical.Backend, *grpc.ClientConn, error) {
if b.multiplexingSupport {
id, err := pluginutil.GetMultiplexIDFromContext(ctx)
if err != nil {
return nil, nil, err
}
if inst, ok := b.instances[id]; ok {
return inst.backend, inst.brokeredClient, nil
}
}
if singleImpl, ok := b.instances[singleImplementationID]; ok {
return singleImpl.backend, singleImpl.brokeredClient, nil
}
return nil, nil, fmt.Errorf("no backend instance found")
}
// getBackendAndBrokeredClient holds a read lock and returns the backend and
// client connection
func (b *backendGRPCPluginServer) getBackendAndBrokeredClient(ctx context.Context) (logical.Backend, *grpc.ClientConn, error) {
b.instancesLock.RLock()
defer b.instancesLock.RUnlock()
return b.getBackendAndBrokeredClientInternal(ctx)
}
// Setup dials into the plugin's broker to get a shimmed storage, logger, and
// system view of the backend. This method also instantiates the underlying
// backend through its factory func for the server side of the plugin.
func (b *backendGRPCPluginServer) Setup(ctx context.Context, args *pb.SetupArgs) (*pb.SetupReply, error) {
var err error
id := singleImplementationID
if b.multiplexingSupport {
id, err = pluginutil.GetMultiplexIDFromContext(ctx)
if err != nil {
return &pb.SetupReply{}, err
}
}
// Dial for storage
brokeredClient, err := b.broker.Dial(args.BrokerID)
if err != nil {
return &pb.SetupReply{}, err
}
storage := newGRPCStorageClient(brokeredClient)
sysView := newGRPCSystemView(brokeredClient)
config := &logical.BackendConfig{
StorageView: storage,
Logger: b.logger,
System: sysView,
Config: args.Config,
BackendUUID: args.BackendUUID,
}
// Call the underlying backend factory after shims have been created
// to set b.backend
backend, err := b.factory(ctx, config)
if err != nil {
return &pb.SetupReply{
Err: pb.ErrToString(err),
}, nil
}
b.instances[id] = backendInstance{
brokeredClient: brokeredClient,
backend: backend,
}
return &pb.SetupReply{}, nil
}
func (b *backendGRPCPluginServer) HandleRequest(ctx context.Context, args *pb.HandleRequestArgs) (*pb.HandleRequestReply, error) {
backend, brokeredClient, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.HandleRequestReply{}, err
}
if pluginutil.InMetadataMode() {
return &pb.HandleRequestReply{}, ErrServerInMetadataMode
}
logicalReq, err := pb.ProtoRequestToLogicalRequest(args.Request)
if err != nil {
return &pb.HandleRequestReply{}, err
}
logicalReq.Storage = newGRPCStorageClient(brokeredClient)
resp, respErr := backend.HandleRequest(ctx, logicalReq)
pbResp, err := pb.LogicalResponseToProtoResponse(resp)
if err != nil {
return &pb.HandleRequestReply{}, err
}
return &pb.HandleRequestReply{
Response: pbResp,
Err: pb.ErrToProtoErr(respErr),
}, nil
}
func (b *backendGRPCPluginServer) Initialize(ctx context.Context, _ *pb.InitializeArgs) (*pb.InitializeReply, error) {
backend, brokeredClient, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.InitializeReply{}, err
}
if pluginutil.InMetadataMode() {
return &pb.InitializeReply{}, ErrServerInMetadataMode
}
req := &logical.InitializationRequest{
Storage: newGRPCStorageClient(brokeredClient),
}
respErr := backend.Initialize(ctx, req)
return &pb.InitializeReply{
Err: pb.ErrToProtoErr(respErr),
}, nil
}
func (b *backendGRPCPluginServer) SpecialPaths(ctx context.Context, args *pb.Empty) (*pb.SpecialPathsReply, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.SpecialPathsReply{}, err
}
paths := backend.SpecialPaths()
if paths == nil {
return &pb.SpecialPathsReply{
Paths: nil,
}, nil
}
return &pb.SpecialPathsReply{
Paths: &pb.Paths{
Root: paths.Root,
Unauthenticated: paths.Unauthenticated,
LocalStorage: paths.LocalStorage,
SealWrapStorage: paths.SealWrapStorage,
},
}, nil
}
func (b *backendGRPCPluginServer) HandleExistenceCheck(ctx context.Context, args *pb.HandleExistenceCheckArgs) (*pb.HandleExistenceCheckReply, error) {
backend, brokeredClient, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.HandleExistenceCheckReply{}, err
}
if pluginutil.InMetadataMode() {
return &pb.HandleExistenceCheckReply{}, ErrServerInMetadataMode
}
logicalReq, err := pb.ProtoRequestToLogicalRequest(args.Request)
if err != nil {
return &pb.HandleExistenceCheckReply{}, err
}
logicalReq.Storage = newGRPCStorageClient(brokeredClient)
checkFound, exists, err := backend.HandleExistenceCheck(ctx, logicalReq)
return &pb.HandleExistenceCheckReply{
CheckFound: checkFound,
Exists: exists,
Err: pb.ErrToProtoErr(err),
}, nil
}
func (b *backendGRPCPluginServer) Cleanup(ctx context.Context, _ *pb.Empty) (*pb.Empty, error) {
b.instancesLock.Lock()
defer b.instancesLock.Unlock()
backend, brokeredClient, err := b.getBackendAndBrokeredClientInternal(ctx)
if err != nil {
return &pb.Empty{}, err
}
backend.Cleanup(ctx)
// Close rpc clients
brokeredClient.Close()
if b.multiplexingSupport {
id, err := pluginutil.GetMultiplexIDFromContext(ctx)
if err != nil {
return nil, err
}
delete(b.instances, id)
} else if _, ok := b.instances[singleImplementationID]; ok {
delete(b.instances, singleImplementationID)
}
return &pb.Empty{}, nil
}
func (b *backendGRPCPluginServer) InvalidateKey(ctx context.Context, args *pb.InvalidateKeyArgs) (*pb.Empty, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.Empty{}, err
}
if pluginutil.InMetadataMode() {
return &pb.Empty{}, ErrServerInMetadataMode
}
backend.InvalidateKey(ctx, args.Key)
return &pb.Empty{}, nil
}
func (b *backendGRPCPluginServer) Type(ctx context.Context, _ *pb.Empty) (*pb.TypeReply, error) {
backend, _, err := b.getBackendAndBrokeredClient(ctx)
if err != nil {
return &pb.TypeReply{}, err
}
return &pb.TypeReply{
Type: uint32(backend.Type()),
}, nil
}