storage/raft: Support storage migration to raft storage (#7207)

* Support raft in the migration command

* Add comments
This commit is contained in:
Brian Kassouf 2019-07-29 13:05:43 -07:00 committed by GitHub
parent 7932afafe2
commit e88721c7db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 190 additions and 68 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"net/url"
"os"
"sort"
"strings"
@ -14,6 +15,7 @@ import (
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/hashicorp/vault/command/server"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/vault"
@ -41,6 +43,7 @@ type OperatorMigrateCommand struct {
type migratorConfig struct {
StorageSource *server.Storage `hcl:"-"`
StorageDestination *server.Storage `hcl:"-"`
ClusterAddr string `hcl:"cluster_addr"`
}
func (c *OperatorMigrateCommand) Synopsis() string {
@ -155,7 +158,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
return nil
}
to, err := c.newBackend(config.StorageDestination.Type, config.StorageDestination.Config)
to, err := c.createDestinationBackend(config.StorageDestination.Type, config.StorageDestination.Config, config)
if err != nil {
return errwrap.Wrapf("error mounting 'storage_destination': {{err}}", err)
}
@ -228,6 +231,46 @@ func (c *OperatorMigrateCommand) newBackend(kind string, conf map[string]string)
return factory(conf, c.logger)
}
func (c *OperatorMigrateCommand) createDestinationBackend(kind string, conf map[string]string, config *migratorConfig) (physical.Backend, error) {
storage, err := c.newBackend(kind, conf)
if err != nil {
return nil, err
}
switch kind {
case "raft":
if len(config.ClusterAddr) == 0 {
return nil, errors.New("cluster_addr config not set")
}
raftStorage, ok := storage.(*raft.RaftBackend)
if !ok {
return nil, errors.New("wrong storage type for raft backend")
}
parsedClusterAddr, err := url.Parse(config.ClusterAddr)
if err != nil {
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
}
if err := raftStorage.Bootstrap(context.Background(), []raft.Peer{
{
ID: raftStorage.NodeID(),
Address: parsedClusterAddr.Host,
},
}); err != nil {
return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err)
}
if err := raftStorage.SetupCluster(context.Background(), raft.SetupOpts{
StartAsLeader: true,
}); err != nil {
return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err)
}
}
return storage, nil
}
// loadMigratorConfig loads the configuration at the given path
func (c *OperatorMigrateCommand) loadMigratorConfig(path string) (*migratorConfig, error) {
fi, err := os.Stat(path)

View File

@ -254,7 +254,7 @@ func (b *RaftBackend) Initialized() bool {
// SetTLSKeyring is used to install a new keyring. If the active key has changed
// it will also close any network connections or streams forcing a reconnect
// with the new key.
func (b *RaftBackend) SetTLSKeyring(keyring *RaftTLSKeyring) error {
func (b *RaftBackend) SetTLSKeyring(keyring *TLSKeyring) error {
b.l.RLock()
err := b.streamLayer.setTLSKeyring(keyring)
b.l.RUnlock()
@ -346,9 +346,23 @@ func (b *RaftBackend) applyConfigSettings(config *raft.Config) error {
return nil
}
// SetupOpts are used to pass options to the raft setup function.
type SetupOpts struct {
// TLSKeyring is the keyring to use for the cluster traffic.
TLSKeyring *TLSKeyring
// ClusterListener is the cluster hook used to register the raft handler and
// client with core's cluster listeners.
ClusterListener cluster.ClusterHook
// StartAsLeader is used to specify this node should start as leader and
// bypass the leader election. This should be used with caution.
StartAsLeader bool
}
// SetupCluster starts the raft cluster and enables the networking needed for
// the raft nodes to communicate.
func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSKeyring, clusterListener cluster.ClusterHook) error {
func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
b.logger.Trace("setting up raft cluster")
b.l.Lock()
@ -371,24 +385,24 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSK
}
switch {
case raftTLSKeyring == nil && clusterListener == nil:
case opts.TLSKeyring == nil && opts.ClusterListener == nil:
// If we don't have a provided network we use an in-memory one.
// This allows us to bootstrap a node without bringing up a cluster
// network. This will be true during bootstrap, tests and dev modes.
_, b.raftTransport = raft.NewInmemTransportWithTimeout(raft.ServerAddress(b.localID), time.Second)
case raftTLSKeyring == nil:
case opts.TLSKeyring == nil:
return errors.New("no keyring provided")
case clusterListener == nil:
case opts.ClusterListener == nil:
return errors.New("no cluster listener provided")
default:
// Load the base TLS config from the cluster listener.
baseTLSConfig, err := clusterListener.TLSConfig(ctx)
baseTLSConfig, err := opts.ClusterListener.TLSConfig(ctx)
if err != nil {
return err
}
// Set the local address and localID in the streaming layer and the raft config.
streamLayer, err := NewRaftLayer(b.logger.Named("stream"), raftTLSKeyring, clusterListener.Addr(), baseTLSConfig)
streamLayer, err := NewRaftLayer(b.logger.Named("stream"), opts.TLSKeyring, opts.ClusterListener.Addr(), baseTLSConfig)
if err != nil {
return err
}
@ -422,10 +436,11 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSK
}
// If we are the only node we should start as the leader.
if len(bootstrapConfig.Servers) == 1 {
raftConfig.StartAsLeader = true
opts.StartAsLeader = true
}
}
raftConfig.StartAsLeader = opts.StartAsLeader
// Setup the Raft store.
b.fsm.SetNoopRestore(true)
@ -463,10 +478,10 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSK
if b.streamLayer != nil {
// Add Handler to the cluster.
clusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer)
opts.ClusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer)
// Add Client to the cluster.
clusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
}
return nil
@ -851,8 +866,10 @@ func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-ch
leaderLost := make(chan struct{})
go func() {
select {
case <-leaderNotifyCh:
close(leaderLost)
case isLeader := <-leaderNotifyCh:
if !isLeader {
close(leaderLost)
}
case <-stopCh:
}
}()

View File

@ -60,7 +60,7 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
t.Fatal(err)
}
err = backend.SetupCluster(context.Background(), nil, nil)
err = backend.SetupCluster(context.Background(), SetupOpts{})
if err != nil {
t.Fatal(err)
}
@ -259,9 +259,9 @@ func TestRaft_Recovery(t *testing.T) {
}
// Bring up the nodes again
raft1.SetupCluster(context.Background(), nil, nil)
raft2.SetupCluster(context.Background(), nil, nil)
raft4.SetupCluster(context.Background(), nil, nil)
raft1.SetupCluster(context.Background(), SetupOpts{})
raft2.SetupCluster(context.Background(), SetupOpts{})
raft4.SetupCluster(context.Background(), SetupOpts{})
peers, err := raft1.Peers(context.Background())
if err != nil {

View File

@ -38,7 +38,7 @@ func addPeer(t *testing.T, leader, follower *RaftBackend) {
t.Fatal(err)
}
err = follower.SetupCluster(context.Background(), nil, nil)
err = follower.SetupCluster(context.Background(), SetupOpts{})
if err != nil {
t.Fatal(err)
}
@ -327,7 +327,7 @@ func TestRaft_Snapshot_Restart(t *testing.T) {
}
// Start Raft
err = raft1.SetupCluster(context.Background(), nil, nil)
err = raft1.SetupCluster(context.Background(), SetupOpts{})
if err != nil {
t.Fatal(err)
}

View File

@ -26,8 +26,8 @@ import (
"github.com/hashicorp/vault/vault/cluster"
)
// RaftTLSKey is a single TLS keypair in the Keyring
type RaftTLSKey struct {
// TLSKey is a single TLS keypair in the Keyring
type TLSKey struct {
// ID is a unique identifier for this Key
ID string `json:"id"`
@ -52,12 +52,12 @@ type RaftTLSKey struct {
parsedKey *ecdsa.PrivateKey
}
// RaftTLSKeyring is the set of keys that raft uses for network communication.
// TLSKeyring is the set of keys that raft uses for network communication.
// Only one key is used to dial at a time but both keys will be used to accept
// connections.
type RaftTLSKeyring struct {
type TLSKeyring struct {
// Keys is the set of available key pairs
Keys []*RaftTLSKey `json:"keys"`
Keys []*TLSKey `json:"keys"`
// AppliedIndex is the earliest known raft index that safely contains the
// latest key in the keyring.
@ -73,7 +73,7 @@ type RaftTLSKeyring struct {
}
// GetActive returns the active key.
func (k *RaftTLSKeyring) GetActive() *RaftTLSKey {
func (k *TLSKeyring) GetActive() *TLSKey {
if k.ActiveKeyID == "" {
return nil
}
@ -86,7 +86,7 @@ func (k *RaftTLSKeyring) GetActive() *RaftTLSKey {
return nil
}
func GenerateTLSKey() (*RaftTLSKey, error) {
func GenerateTLSKey() (*TLSKey, error) {
key, err := ecdsa.GenerateKey(elliptic.P521(), rand.Reader)
if err != nil {
return nil, err
@ -120,7 +120,7 @@ func GenerateTLSKey() (*RaftTLSKey, error) {
return nil, errwrap.Wrapf("unable to generate local cluster certificate: {{err}}", err)
}
return &RaftTLSKey{
return &TLSKey{
ID: host,
KeyType: certutil.PrivateKeyTypeP521,
CertBytes: certBytes,
@ -161,13 +161,13 @@ type raftLayer struct {
dialerFunc func(string, time.Duration) (net.Conn, error)
// TLS config
keyring *RaftTLSKeyring
keyring *TLSKeyring
baseTLSConfig *tls.Config
}
// NewRaftLayer creates a new raftLayer object. It parses the TLS information
// from the network config.
func NewRaftLayer(logger log.Logger, raftTLSKeyring *RaftTLSKeyring, clusterAddr net.Addr, baseTLSConfig *tls.Config) (*raftLayer, error) {
func NewRaftLayer(logger log.Logger, raftTLSKeyring *TLSKeyring, clusterAddr net.Addr, baseTLSConfig *tls.Config) (*raftLayer, error) {
switch {
case clusterAddr == nil:
// Clustering disabled on the server, don't try to look for params
@ -189,7 +189,7 @@ func NewRaftLayer(logger log.Logger, raftTLSKeyring *RaftTLSKeyring, clusterAddr
return layer, nil
}
func (l *raftLayer) setTLSKeyring(keyring *RaftTLSKeyring) error {
func (l *raftLayer) setTLSKeyring(keyring *TLSKeyring) error {
// Fast path a noop update
if l.keyring != nil && l.keyring.Term == keyring.Term {
return nil

View File

@ -11,8 +11,6 @@ import (
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/pgpkeys"
@ -150,16 +148,18 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
if err != nil {
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
}
if err := c.underlyingPhysical.(*raft.RaftBackend).Bootstrap(ctx, []raft.Peer{
if err := raftStorage.Bootstrap(ctx, []raft.Peer{
{
ID: c.underlyingPhysical.(*raft.RaftBackend).NodeID(),
ID: raftStorage.NodeID(),
Address: parsedClusterAddr.Host,
},
}); err != nil {
return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err)
}
if err := raftStorage.SetupCluster(ctx, nil, nil); err != nil {
if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{
StartAsLeader: true,
}); err != nil {
return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err)
}
@ -297,24 +297,9 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
results.RootToken = base64.StdEncoding.EncodeToString(encryptedVals[0])
}
if _, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
raftTLS, err := raft.GenerateTLSKey()
if err != nil {
return nil, err
}
keyring := &raft.RaftTLSKeyring{
Keys: []*raft.RaftTLSKey{raftTLS},
ActiveKeyID: raftTLS.ID,
}
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
if err != nil {
return nil, err
}
if err := c.barrier.Put(ctx, entry); err != nil {
return nil, err
}
if err := c.createRaftTLSKeyring(ctx); err != nil {
c.logger.Error("failed to create raft TLS keyring", "error", err)
return nil, err
}
// Prepare to re-seal

View File

@ -256,7 +256,7 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
if tlsKeyringEntry == nil {
return nil, errors.New("could not find raft TLS configuration")
}
var keyring raft.RaftTLSKeyring
var keyring raft.TLSKeyring
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
return nil, errors.New("could not decode raft TLS configuration")
}

View File

@ -78,7 +78,7 @@ func (s *raftFollowerStates) minIndex() uint64 {
// startRaftStorage will call SetupCluster in the raft backend which starts raft
// up and enables the cluster handler.
func (c *Core) startRaftStorage(ctx context.Context) error {
func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
if !ok {
return nil
@ -92,20 +92,69 @@ func (c *Core) startRaftStorage(ctx context.Context) error {
if err != nil {
return err
}
if raftTLSEntry == nil {
return errors.New("could not find raft TLS configuration")
}
raftTLS := new(raft.RaftTLSKeyring)
if err := raftTLSEntry.DecodeJSON(raftTLS); err != nil {
return err
var creating bool
var raftTLS *raft.TLSKeyring
switch raftTLSEntry {
case nil:
// If we did not find a TLS keyring we will attempt to create one here.
// This happens after a storage migration process. This node is also
// marked to start as leader so we can write the new TLS Key. This is an
// error condition if there are already multiple nodes in the cluster,
// and the below storage write will fail. If the cluster is somehow in
// this state the unseal will fail and a cluster recovery will need to
// be done.
creating = true
raftTLSKey, err := raft.GenerateTLSKey()
if err != nil {
return err
}
raftTLS = &raft.TLSKeyring{
Keys: []*raft.TLSKey{raftTLSKey},
ActiveKeyID: raftTLSKey.ID,
}
default:
raftTLS = new(raft.TLSKeyring)
if err := raftTLSEntry.DecodeJSON(raftTLS); err != nil {
return err
}
}
raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
if err := raftStorage.SetupCluster(ctx, raftTLS, c.clusterListener); err != nil {
if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{
TLSKeyring: raftTLS,
ClusterListener: c.clusterListener,
StartAsLeader: creating,
}); err != nil {
return err
}
defer func() {
if retErr != nil {
c.logger.Info("stopping raft server")
if err := raftStorage.TeardownCluster(c.clusterListener); err != nil {
c.logger.Error("failed to stop raft server", "error", err)
}
}
}()
// If we are in need of creating the TLS keyring then we should write it out
// to storage here. If we fail it may mean we couldn't become leader and we
// should error out.
if creating {
c.logger.Info("writing raft TLS keyring to storage")
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, raftTLS)
if err != nil {
c.logger.Error("error marshaling raft TLS keyring", "error", err)
return err
}
if err := c.barrier.Put(ctx, entry); err != nil {
c.logger.Error("error writing raft TLS keyring", "error", err)
return err
}
}
return nil
}
@ -164,7 +213,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
c.raftTLSRotationStopCh = stopCh
c.raftFollowerStates = followerStates
readKeyring := func() (*raft.RaftTLSKeyring, error) {
readKeyring := func() (*raft.TLSKeyring, error) {
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
if err != nil {
return nil, err
@ -172,7 +221,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
if tlsKeyringEntry == nil {
return nil, errors.New("no keyring found")
}
var keyring raft.RaftTLSKeyring
var keyring raft.TLSKeyring
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
return nil, err
}
@ -345,6 +394,31 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
return nil
}
func (c *Core) createRaftTLSKeyring(ctx context.Context) error {
if _, ok := c.underlyingPhysical.(*raft.RaftBackend); !ok {
return nil
}
raftTLS, err := raft.GenerateTLSKey()
if err != nil {
return err
}
keyring := &raft.TLSKeyring{
Keys: []*raft.TLSKey{raftTLS},
ActiveKeyID: raftTLS.ID,
}
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
if err != nil {
return err
}
if err := c.barrier.Put(ctx, entry); err != nil {
return err
}
return nil
}
func (c *Core) stopPeriodicRaftTLSRotate() {
if c.raftTLSRotationStopCh != nil {
close(c.raftTLSRotationStopCh)
@ -367,7 +441,7 @@ func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
return nil
}
var keyring raft.RaftTLSKeyring
var keyring raft.TLSKeyring
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
return err
}
@ -639,7 +713,10 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
}
raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
err = raftStorage.SetupCluster(ctx, answerResp.Data.TLSKeyring, c.clusterListener)
err = raftStorage.SetupCluster(ctx, raft.SetupOpts{
TLSKeyring: answerResp.Data.TLSKeyring,
ClusterListener: c.clusterListener,
})
if err != nil {
return errwrap.Wrapf("failed to setup raft cluster: {{err}}", err)
}
@ -656,6 +733,6 @@ type answerRespData struct {
}
type answerResp struct {
Peers []raft.Peer `json:"peers"`
TLSKeyring *raft.RaftTLSKeyring `json:"tls_keyring"`
Peers []raft.Peer `json:"peers"`
TLSKeyring *raft.TLSKeyring `json:"tls_keyring"`
}