From e88721c7db1eff6baf39cbb53a45ec35455b7ef4 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Mon, 29 Jul 2019 13:05:43 -0700 Subject: [PATCH] storage/raft: Support storage migration to raft storage (#7207) * Support raft in the migration command * Add comments --- command/operator_migrate.go | 45 +++++++++++++- physical/raft/raft.go | 41 +++++++++---- physical/raft/raft_test.go | 8 +-- physical/raft/snapshot_test.go | 4 +- physical/raft/streamlayer.go | 22 +++---- vault/init.go | 31 +++------- vault/logical_system_raft.go | 2 +- vault/raft.go | 105 ++++++++++++++++++++++++++++----- 8 files changed, 190 insertions(+), 68 deletions(-) diff --git a/command/operator_migrate.go b/command/operator_migrate.go index 5e44533da9..349bae33c0 100644 --- a/command/operator_migrate.go +++ b/command/operator_migrate.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io/ioutil" + "net/url" "os" "sort" "strings" @@ -14,6 +15,7 @@ import ( "github.com/hashicorp/hcl" "github.com/hashicorp/hcl/hcl/ast" "github.com/hashicorp/vault/command/server" + "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/logging" "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/vault" @@ -41,6 +43,7 @@ type OperatorMigrateCommand struct { type migratorConfig struct { StorageSource *server.Storage `hcl:"-"` StorageDestination *server.Storage `hcl:"-"` + ClusterAddr string `hcl:"cluster_addr"` } func (c *OperatorMigrateCommand) Synopsis() string { @@ -155,7 +158,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error { return nil } - to, err := c.newBackend(config.StorageDestination.Type, config.StorageDestination.Config) + to, err := c.createDestinationBackend(config.StorageDestination.Type, config.StorageDestination.Config, config) if err != nil { return errwrap.Wrapf("error mounting 'storage_destination': {{err}}", err) } @@ -228,6 +231,46 @@ func (c *OperatorMigrateCommand) newBackend(kind string, conf map[string]string) return factory(conf, c.logger) } +func (c *OperatorMigrateCommand) createDestinationBackend(kind string, conf map[string]string, config *migratorConfig) (physical.Backend, error) { + storage, err := c.newBackend(kind, conf) + if err != nil { + return nil, err + } + + switch kind { + case "raft": + if len(config.ClusterAddr) == 0 { + return nil, errors.New("cluster_addr config not set") + } + + raftStorage, ok := storage.(*raft.RaftBackend) + if !ok { + return nil, errors.New("wrong storage type for raft backend") + } + + parsedClusterAddr, err := url.Parse(config.ClusterAddr) + if err != nil { + return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err) + } + if err := raftStorage.Bootstrap(context.Background(), []raft.Peer{ + { + ID: raftStorage.NodeID(), + Address: parsedClusterAddr.Host, + }, + }); err != nil { + return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err) + } + + if err := raftStorage.SetupCluster(context.Background(), raft.SetupOpts{ + StartAsLeader: true, + }); err != nil { + return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err) + } + } + + return storage, nil +} + // loadMigratorConfig loads the configuration at the given path func (c *OperatorMigrateCommand) loadMigratorConfig(path string) (*migratorConfig, error) { fi, err := os.Stat(path) diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 87a9976f29..dfbba0513d 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -254,7 +254,7 @@ func (b *RaftBackend) Initialized() bool { // SetTLSKeyring is used to install a new keyring. If the active key has changed // it will also close any network connections or streams forcing a reconnect // with the new key. -func (b *RaftBackend) SetTLSKeyring(keyring *RaftTLSKeyring) error { +func (b *RaftBackend) SetTLSKeyring(keyring *TLSKeyring) error { b.l.RLock() err := b.streamLayer.setTLSKeyring(keyring) b.l.RUnlock() @@ -346,9 +346,23 @@ func (b *RaftBackend) applyConfigSettings(config *raft.Config) error { return nil } +// SetupOpts are used to pass options to the raft setup function. +type SetupOpts struct { + // TLSKeyring is the keyring to use for the cluster traffic. + TLSKeyring *TLSKeyring + + // ClusterListener is the cluster hook used to register the raft handler and + // client with core's cluster listeners. + ClusterListener cluster.ClusterHook + + // StartAsLeader is used to specify this node should start as leader and + // bypass the leader election. This should be used with caution. + StartAsLeader bool +} + // SetupCluster starts the raft cluster and enables the networking needed for // the raft nodes to communicate. -func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSKeyring, clusterListener cluster.ClusterHook) error { +func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error { b.logger.Trace("setting up raft cluster") b.l.Lock() @@ -371,24 +385,24 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSK } switch { - case raftTLSKeyring == nil && clusterListener == nil: + case opts.TLSKeyring == nil && opts.ClusterListener == nil: // If we don't have a provided network we use an in-memory one. // This allows us to bootstrap a node without bringing up a cluster // network. This will be true during bootstrap, tests and dev modes. _, b.raftTransport = raft.NewInmemTransportWithTimeout(raft.ServerAddress(b.localID), time.Second) - case raftTLSKeyring == nil: + case opts.TLSKeyring == nil: return errors.New("no keyring provided") - case clusterListener == nil: + case opts.ClusterListener == nil: return errors.New("no cluster listener provided") default: // Load the base TLS config from the cluster listener. - baseTLSConfig, err := clusterListener.TLSConfig(ctx) + baseTLSConfig, err := opts.ClusterListener.TLSConfig(ctx) if err != nil { return err } // Set the local address and localID in the streaming layer and the raft config. - streamLayer, err := NewRaftLayer(b.logger.Named("stream"), raftTLSKeyring, clusterListener.Addr(), baseTLSConfig) + streamLayer, err := NewRaftLayer(b.logger.Named("stream"), opts.TLSKeyring, opts.ClusterListener.Addr(), baseTLSConfig) if err != nil { return err } @@ -422,10 +436,11 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSK } // If we are the only node we should start as the leader. if len(bootstrapConfig.Servers) == 1 { - raftConfig.StartAsLeader = true + opts.StartAsLeader = true } } + raftConfig.StartAsLeader = opts.StartAsLeader // Setup the Raft store. b.fsm.SetNoopRestore(true) @@ -463,10 +478,10 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, raftTLSKeyring *RaftTLSK if b.streamLayer != nil { // Add Handler to the cluster. - clusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer) + opts.ClusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer) // Add Client to the cluster. - clusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer) + opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer) } return nil @@ -851,8 +866,10 @@ func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-ch leaderLost := make(chan struct{}) go func() { select { - case <-leaderNotifyCh: - close(leaderLost) + case isLeader := <-leaderNotifyCh: + if !isLeader { + close(leaderLost) + } case <-stopCh: } }() diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index c05f35d8fb..b7260e75e4 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -60,7 +60,7 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str t.Fatal(err) } - err = backend.SetupCluster(context.Background(), nil, nil) + err = backend.SetupCluster(context.Background(), SetupOpts{}) if err != nil { t.Fatal(err) } @@ -259,9 +259,9 @@ func TestRaft_Recovery(t *testing.T) { } // Bring up the nodes again - raft1.SetupCluster(context.Background(), nil, nil) - raft2.SetupCluster(context.Background(), nil, nil) - raft4.SetupCluster(context.Background(), nil, nil) + raft1.SetupCluster(context.Background(), SetupOpts{}) + raft2.SetupCluster(context.Background(), SetupOpts{}) + raft4.SetupCluster(context.Background(), SetupOpts{}) peers, err := raft1.Peers(context.Background()) if err != nil { diff --git a/physical/raft/snapshot_test.go b/physical/raft/snapshot_test.go index 57ed3eba4b..77bae86ddd 100644 --- a/physical/raft/snapshot_test.go +++ b/physical/raft/snapshot_test.go @@ -38,7 +38,7 @@ func addPeer(t *testing.T, leader, follower *RaftBackend) { t.Fatal(err) } - err = follower.SetupCluster(context.Background(), nil, nil) + err = follower.SetupCluster(context.Background(), SetupOpts{}) if err != nil { t.Fatal(err) } @@ -327,7 +327,7 @@ func TestRaft_Snapshot_Restart(t *testing.T) { } // Start Raft - err = raft1.SetupCluster(context.Background(), nil, nil) + err = raft1.SetupCluster(context.Background(), SetupOpts{}) if err != nil { t.Fatal(err) } diff --git a/physical/raft/streamlayer.go b/physical/raft/streamlayer.go index ec04b31616..f8e76e20cc 100644 --- a/physical/raft/streamlayer.go +++ b/physical/raft/streamlayer.go @@ -26,8 +26,8 @@ import ( "github.com/hashicorp/vault/vault/cluster" ) -// RaftTLSKey is a single TLS keypair in the Keyring -type RaftTLSKey struct { +// TLSKey is a single TLS keypair in the Keyring +type TLSKey struct { // ID is a unique identifier for this Key ID string `json:"id"` @@ -52,12 +52,12 @@ type RaftTLSKey struct { parsedKey *ecdsa.PrivateKey } -// RaftTLSKeyring is the set of keys that raft uses for network communication. +// TLSKeyring is the set of keys that raft uses for network communication. // Only one key is used to dial at a time but both keys will be used to accept // connections. -type RaftTLSKeyring struct { +type TLSKeyring struct { // Keys is the set of available key pairs - Keys []*RaftTLSKey `json:"keys"` + Keys []*TLSKey `json:"keys"` // AppliedIndex is the earliest known raft index that safely contains the // latest key in the keyring. @@ -73,7 +73,7 @@ type RaftTLSKeyring struct { } // GetActive returns the active key. -func (k *RaftTLSKeyring) GetActive() *RaftTLSKey { +func (k *TLSKeyring) GetActive() *TLSKey { if k.ActiveKeyID == "" { return nil } @@ -86,7 +86,7 @@ func (k *RaftTLSKeyring) GetActive() *RaftTLSKey { return nil } -func GenerateTLSKey() (*RaftTLSKey, error) { +func GenerateTLSKey() (*TLSKey, error) { key, err := ecdsa.GenerateKey(elliptic.P521(), rand.Reader) if err != nil { return nil, err @@ -120,7 +120,7 @@ func GenerateTLSKey() (*RaftTLSKey, error) { return nil, errwrap.Wrapf("unable to generate local cluster certificate: {{err}}", err) } - return &RaftTLSKey{ + return &TLSKey{ ID: host, KeyType: certutil.PrivateKeyTypeP521, CertBytes: certBytes, @@ -161,13 +161,13 @@ type raftLayer struct { dialerFunc func(string, time.Duration) (net.Conn, error) // TLS config - keyring *RaftTLSKeyring + keyring *TLSKeyring baseTLSConfig *tls.Config } // NewRaftLayer creates a new raftLayer object. It parses the TLS information // from the network config. -func NewRaftLayer(logger log.Logger, raftTLSKeyring *RaftTLSKeyring, clusterAddr net.Addr, baseTLSConfig *tls.Config) (*raftLayer, error) { +func NewRaftLayer(logger log.Logger, raftTLSKeyring *TLSKeyring, clusterAddr net.Addr, baseTLSConfig *tls.Config) (*raftLayer, error) { switch { case clusterAddr == nil: // Clustering disabled on the server, don't try to look for params @@ -189,7 +189,7 @@ func NewRaftLayer(logger log.Logger, raftTLSKeyring *RaftTLSKeyring, clusterAddr return layer, nil } -func (l *raftLayer) setTLSKeyring(keyring *RaftTLSKeyring) error { +func (l *raftLayer) setTLSKeyring(keyring *TLSKeyring) error { // Fast path a noop update if l.keyring != nil && l.keyring.Term == keyring.Term { return nil diff --git a/vault/init.go b/vault/init.go index dc083d6bc3..72408a7850 100644 --- a/vault/init.go +++ b/vault/init.go @@ -11,8 +11,6 @@ import ( "github.com/hashicorp/vault/physical/raft" - "github.com/hashicorp/vault/sdk/logical" - "github.com/hashicorp/errwrap" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/pgpkeys" @@ -150,16 +148,18 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes if err != nil { return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err) } - if err := c.underlyingPhysical.(*raft.RaftBackend).Bootstrap(ctx, []raft.Peer{ + if err := raftStorage.Bootstrap(ctx, []raft.Peer{ { - ID: c.underlyingPhysical.(*raft.RaftBackend).NodeID(), + ID: raftStorage.NodeID(), Address: parsedClusterAddr.Host, }, }); err != nil { return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err) } - if err := raftStorage.SetupCluster(ctx, nil, nil); err != nil { + if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{ + StartAsLeader: true, + }); err != nil { return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err) } @@ -297,24 +297,9 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes results.RootToken = base64.StdEncoding.EncodeToString(encryptedVals[0]) } - if _, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { - raftTLS, err := raft.GenerateTLSKey() - if err != nil { - return nil, err - } - - keyring := &raft.RaftTLSKeyring{ - Keys: []*raft.RaftTLSKey{raftTLS}, - ActiveKeyID: raftTLS.ID, - } - - entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) - if err != nil { - return nil, err - } - if err := c.barrier.Put(ctx, entry); err != nil { - return nil, err - } + if err := c.createRaftTLSKeyring(ctx); err != nil { + c.logger.Error("failed to create raft TLS keyring", "error", err) + return nil, err } // Prepare to re-seal diff --git a/vault/logical_system_raft.go b/vault/logical_system_raft.go index 7a5be66020..f2eac7df9c 100644 --- a/vault/logical_system_raft.go +++ b/vault/logical_system_raft.go @@ -256,7 +256,7 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc if tlsKeyringEntry == nil { return nil, errors.New("could not find raft TLS configuration") } - var keyring raft.RaftTLSKeyring + var keyring raft.TLSKeyring if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { return nil, errors.New("could not decode raft TLS configuration") } diff --git a/vault/raft.go b/vault/raft.go index 31350eb101..0ac3c1edd2 100644 --- a/vault/raft.go +++ b/vault/raft.go @@ -78,7 +78,7 @@ func (s *raftFollowerStates) minIndex() uint64 { // startRaftStorage will call SetupCluster in the raft backend which starts raft // up and enables the cluster handler. -func (c *Core) startRaftStorage(ctx context.Context) error { +func (c *Core) startRaftStorage(ctx context.Context) (retErr error) { raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) if !ok { return nil @@ -92,20 +92,69 @@ func (c *Core) startRaftStorage(ctx context.Context) error { if err != nil { return err } - if raftTLSEntry == nil { - return errors.New("could not find raft TLS configuration") - } - raftTLS := new(raft.RaftTLSKeyring) - if err := raftTLSEntry.DecodeJSON(raftTLS); err != nil { - return err + var creating bool + var raftTLS *raft.TLSKeyring + switch raftTLSEntry { + case nil: + // If we did not find a TLS keyring we will attempt to create one here. + // This happens after a storage migration process. This node is also + // marked to start as leader so we can write the new TLS Key. This is an + // error condition if there are already multiple nodes in the cluster, + // and the below storage write will fail. If the cluster is somehow in + // this state the unseal will fail and a cluster recovery will need to + // be done. + creating = true + raftTLSKey, err := raft.GenerateTLSKey() + if err != nil { + return err + } + + raftTLS = &raft.TLSKeyring{ + Keys: []*raft.TLSKey{raftTLSKey}, + ActiveKeyID: raftTLSKey.ID, + } + default: + raftTLS = new(raft.TLSKeyring) + if err := raftTLSEntry.DecodeJSON(raftTLS); err != nil { + return err + } } raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) - if err := raftStorage.SetupCluster(ctx, raftTLS, c.clusterListener); err != nil { + if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{ + TLSKeyring: raftTLS, + ClusterListener: c.clusterListener, + StartAsLeader: creating, + }); err != nil { return err } + defer func() { + if retErr != nil { + c.logger.Info("stopping raft server") + if err := raftStorage.TeardownCluster(c.clusterListener); err != nil { + c.logger.Error("failed to stop raft server", "error", err) + } + } + }() + + // If we are in need of creating the TLS keyring then we should write it out + // to storage here. If we fail it may mean we couldn't become leader and we + // should error out. + if creating { + c.logger.Info("writing raft TLS keyring to storage") + entry, err := logical.StorageEntryJSON(raftTLSStoragePath, raftTLS) + if err != nil { + c.logger.Error("error marshaling raft TLS keyring", "error", err) + return err + } + if err := c.barrier.Put(ctx, entry); err != nil { + c.logger.Error("error writing raft TLS keyring", "error", err) + return err + } + } + return nil } @@ -164,7 +213,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { c.raftTLSRotationStopCh = stopCh c.raftFollowerStates = followerStates - readKeyring := func() (*raft.RaftTLSKeyring, error) { + readKeyring := func() (*raft.TLSKeyring, error) { tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) if err != nil { return nil, err @@ -172,7 +221,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { if tlsKeyringEntry == nil { return nil, errors.New("no keyring found") } - var keyring raft.RaftTLSKeyring + var keyring raft.TLSKeyring if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { return nil, err } @@ -345,6 +394,31 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { return nil } +func (c *Core) createRaftTLSKeyring(ctx context.Context) error { + if _, ok := c.underlyingPhysical.(*raft.RaftBackend); !ok { + return nil + } + + raftTLS, err := raft.GenerateTLSKey() + if err != nil { + return err + } + + keyring := &raft.TLSKeyring{ + Keys: []*raft.TLSKey{raftTLS}, + ActiveKeyID: raftTLS.ID, + } + + entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) + if err != nil { + return err + } + if err := c.barrier.Put(ctx, entry); err != nil { + return err + } + return nil +} + func (c *Core) stopPeriodicRaftTLSRotate() { if c.raftTLSRotationStopCh != nil { close(c.raftTLSRotationStopCh) @@ -367,7 +441,7 @@ func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error { return nil } - var keyring raft.RaftTLSKeyring + var keyring raft.TLSKeyring if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { return err } @@ -639,7 +713,10 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, } raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) - err = raftStorage.SetupCluster(ctx, answerResp.Data.TLSKeyring, c.clusterListener) + err = raftStorage.SetupCluster(ctx, raft.SetupOpts{ + TLSKeyring: answerResp.Data.TLSKeyring, + ClusterListener: c.clusterListener, + }) if err != nil { return errwrap.Wrapf("failed to setup raft cluster: {{err}}", err) } @@ -656,6 +733,6 @@ type answerRespData struct { } type answerResp struct { - Peers []raft.Peer `json:"peers"` - TLSKeyring *raft.RaftTLSKeyring `json:"tls_keyring"` + Peers []raft.Peer `json:"peers"` + TLSKeyring *raft.TLSKeyring `json:"tls_keyring"` }