From 68750b70a2c139c68b4ea2aeecfd1317fef653ef Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Fri, 11 Oct 2019 11:56:59 -0700 Subject: [PATCH] OSS portions of raft non-voters (#7634) * OSS portions of raft non-voters * add file * Update vault/raft.go Co-Authored-By: Vishal Nayak --- api/sys_raft.go | 1 + command/operator_raft_join.go | 15 ++++++++++--- helper/testhelpers/testhelpers.go | 4 ++-- http/sys_raft.go | 8 ++++++- http/util.go | 2 ++ physical/raft/raft.go | 1 - physical/raft/raft_util.go | 13 ++++++++++++ vault/core.go | 26 +++++++++++------------ vault/external_tests/raft/raft_test.go | 2 +- vault/logical_system_raft.go | 14 ++++++++++++- vault/raft.go | 29 ++++++++++++++------------ 11 files changed, 80 insertions(+), 35 deletions(-) create mode 100644 physical/raft/raft_util.go diff --git a/api/sys_raft.go b/api/sys_raft.go index 6897dc0a7e..5057541f57 100644 --- a/api/sys_raft.go +++ b/api/sys_raft.go @@ -20,6 +20,7 @@ type RaftJoinRequest struct { LeaderClientCert string `json:"leader_client_cert"` LeaderClientKey string `json:"leader_client_key"` Retry bool `json:"retry"` + NonVoter bool `json:"non_voter"` } // RaftJoin adds the node from which this call is invoked from to the raft diff --git a/command/operator_raft_join.go b/command/operator_raft_join.go index 528007c66f..bc05ec930b 100644 --- a/command/operator_raft_join.go +++ b/command/operator_raft_join.go @@ -13,10 +13,11 @@ var _ cli.Command = (*OperatorRaftJoinCommand)(nil) var _ cli.CommandAutocomplete = (*OperatorRaftJoinCommand)(nil) type OperatorRaftJoinCommand struct { - flagRaftRetry bool + flagRetry bool flagLeaderCACert string flagLeaderClientCert string flagLeaderClientKey string + flagNonVoter bool *BaseCommand } @@ -66,11 +67,18 @@ func (c *OperatorRaftJoinCommand) Flags() *FlagSets { f.BoolVar(&BoolVar{ Name: "retry", - Target: &c.flagRaftRetry, + Target: &c.flagRetry, Default: false, Usage: "Continuously retry joining the raft cluster upon failures.", }) + f.BoolVar(&BoolVar{ + Name: "non-voter", + Target: &c.flagNonVoter, + Default: false, + Usage: "(Enterprise-only) This flag is used to make the server not participate in the Raft quorum, and have it only receive the data replication stream. This can be used to add read scalability to a cluster in cases where a high volume of reads to servers are needed.", + }) + return set } @@ -117,7 +125,8 @@ func (c *OperatorRaftJoinCommand) Run(args []string) int { LeaderCACert: c.flagLeaderCACert, LeaderClientCert: c.flagLeaderClientCert, LeaderClientKey: c.flagLeaderClientKey, - Retry: c.flagRaftRetry, + Retry: c.flagRetry, + NonVoter: c.flagNonVoter, }) if err != nil { c.UI.Error(fmt.Sprintf("Error joining the node to the raft cluster: %s", err)) diff --git a/helper/testhelpers/testhelpers.go b/helper/testhelpers/testhelpers.go index 6fde258df6..79bb2f1292 100644 --- a/helper/testhelpers/testhelpers.go +++ b/helper/testhelpers/testhelpers.go @@ -328,7 +328,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) { { core := cluster.Cores[1] core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) - _, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false) + _, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false) if err != nil { t.Fatal(err) } @@ -340,7 +340,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) { { core := cluster.Cores[2] core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) - _, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false) + _, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false) if err != nil { t.Fatal(err) } diff --git a/http/sys_raft.go b/http/sys_raft.go index 214c23a6f7..78b411f934 100644 --- a/http/sys_raft.go +++ b/http/sys_raft.go @@ -3,6 +3,7 @@ package http import ( "context" "crypto/tls" + "errors" "io" "net/http" @@ -29,6 +30,10 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ return } + if req.NonVoter && !nonVotersAllowed { + respondError(w, http.StatusBadRequest, errors.New("non-voting nodes not allowed")) + } + var tlsConfig *tls.Config var err error if len(req.LeaderCACert) != 0 || len(req.LeaderClientCert) != 0 || len(req.LeaderClientKey) != 0 { @@ -39,7 +44,7 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ } } - joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry) + joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry, req.NonVoter) if err != nil { respondError(w, http.StatusInternalServerError, err) return @@ -61,4 +66,5 @@ type JoinRequest struct { LeaderClientCert string `json:"leader_client_cert"` LeaderClientKey string `json:"leader_client_key"` Retry bool `json:"retry"` + NonVoter bool `json:"non_voter"` } diff --git a/http/util.go b/http/util.go index ee7c546cd9..ff50a4838b 100644 --- a/http/util.go +++ b/http/util.go @@ -19,4 +19,6 @@ var ( } additionalRoutes = func(mux *http.ServeMux, core *vault.Core) {} + + nonVotersAllowed = false ) diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 329d61e0e5..e8f68fb7a1 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -575,7 +575,6 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e b.logger.Debug("adding raft peer", "node_id", peerID, "cluster_addr", clusterAddr) future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0) - return future.Error() } diff --git a/physical/raft/raft_util.go b/physical/raft/raft_util.go new file mode 100644 index 0000000000..0223f834ea --- /dev/null +++ b/physical/raft/raft_util.go @@ -0,0 +1,13 @@ +// +build !enterprise + +package raft + +import ( + "context" + "errors" +) + +// AddPeer adds a new server to the raft cluster +func (b *RaftBackend) AddNonVotingPeer(ctx context.Context, peerID, clusterAddr string) error { + return errors.New("not implemented") +} diff --git a/vault/core.go b/vault/core.go index 61ba2fda0d..3d2c54481c 100644 --- a/vault/core.go +++ b/vault/core.go @@ -152,6 +152,13 @@ type unlockInformation struct { Nonce string } +type raftInformation struct { + challenge *physical.EncryptedBlobInfo + leaderClient *api.Client + leaderBarrierConfig *SealConfig + nonVoter bool +} + // Core is used as the central manager of Vault activity. It is the primary point of // interface for API handlers and is responsible for managing the logical and physical // backends, router, security barrier, and audit trails. @@ -189,13 +196,9 @@ type Core struct { // seal is our seal, for seal configuration information seal Seal - raftUnseal bool - - raftChallenge *physical.EncryptedBlobInfo - - raftLeaderClient *api.Client - - raftLeaderBarrierConfig *SealConfig + // raftInfo will contain information required for this node to join as a + // peer to an existing raft cluster + raftInfo *raftInformation // migrationSeal is the seal to use during a migration operation. It is the // seal we're migrating *from*. @@ -923,14 +926,11 @@ func (c *Core) unseal(key []byte, useRecoveryKeys bool) (bool, error) { // If we are in the middle of a raft join send the answer and wait for // data to start streaming in. - if err := c.joinRaftSendAnswer(ctx, c.raftLeaderClient, c.raftChallenge, c.seal.GetAccess()); err != nil { + if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), c.raftInfo); err != nil { return false, err } // Reset the state - c.raftUnseal = false - c.raftChallenge = nil - c.raftLeaderBarrierConfig = nil - c.raftLeaderClient = nil + c.raftInfo = nil go func() { keyringFound := false @@ -1002,7 +1002,7 @@ func (c *Core) unsealPart(ctx context.Context, seal Seal, key []byte, useRecover case c.isRaftUnseal(): // Ignore follower's seal config and refer to leader's barrier // configuration. - config = c.raftLeaderBarrierConfig + config = c.raftInfo.leaderBarrierConfig default: config, err = seal.BarrierConfig(ctx) } diff --git a/vault/external_tests/raft/raft_test.go b/vault/external_tests/raft/raft_test.go index b4bc8d8109..01c12deafa 100644 --- a/vault/external_tests/raft/raft_test.go +++ b/vault/external_tests/raft/raft_test.go @@ -1,4 +1,4 @@ -package vault +package rafttests import ( "bytes" diff --git a/vault/logical_system_raft.go b/vault/logical_system_raft.go index 2edca49b1f..18c1c16b68 100644 --- a/vault/logical_system_raft.go +++ b/vault/logical_system_raft.go @@ -33,6 +33,9 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path { "cluster_addr": { Type: framework.TypeString, }, + "non_voter": { + Type: framework.TypeBool, + }, }, Operations: map[logical.Operation]framework.OperationHandler{ @@ -233,6 +236,8 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc return logical.ErrorResponse("no cluster_addr provided"), logical.ErrInvalidRequest } + nonVoter := d.Get("non_voter").(bool) + answer, err := base64.StdEncoding.DecodeString(answerRaw) if err != nil { return logical.ErrorResponse("could not base64 decode answer"), logical.ErrInvalidRequest @@ -261,9 +266,16 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc return nil, errors.New("could not decode raft TLS configuration") } - if err := raftStorage.AddPeer(ctx, serverID, clusterAddr); err != nil { + switch nonVoter { + case true: + err = raftStorage.AddNonVotingPeer(ctx, serverID, clusterAddr) + default: + err = raftStorage.AddPeer(ctx, serverID, clusterAddr) + } + if err != nil { return nil, err } + if b.Core.raftFollowerStates != nil { b.Core.raftFollowerStates.update(serverID, 0) } diff --git a/vault/raft.go b/vault/raft.go index b86ab51d7a..562179a850 100644 --- a/vault/raft.go +++ b/vault/raft.go @@ -524,7 +524,7 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co } } -func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry bool) (bool, error) { +func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry, nonVoter bool) (bool, error) { if len(leaderAddr) == 0 { return false, errors.New("No leader address provided") } @@ -603,17 +603,19 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig if err := proto.Unmarshal(challengeRaw, eBlob); err != nil { return errwrap.Wrapf("error decoding challenge: {{err}}", err) } - + raftInfo := &raftInformation{ + challenge: eBlob, + leaderClient: apiClient, + leaderBarrierConfig: &sealConfig, + nonVoter: nonVoter, + } if c.seal.BarrierType() == seal.Shamir { - c.raftUnseal = true - c.raftChallenge = eBlob - c.raftLeaderClient = apiClient - c.raftLeaderBarrierConfig = &sealConfig + c.raftInfo = raftInfo c.seal.SetBarrierConfig(ctx, &sealConfig) return nil } - if err := c.joinRaftSendAnswer(ctx, apiClient, eBlob, c.seal.GetAccess()); err != nil { + if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil { return errwrap.Wrapf("failed to send answer to leader node: {{err}}", err) } @@ -649,8 +651,8 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig // This is used in tests to override the cluster address var UpdateClusterAddrForTests uint32 -func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, challenge *physical.EncryptedBlobInfo, sealAccess seal.Access) error { - if challenge == nil { +func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, raftInfo *raftInformation) error { + if raftInfo.challenge == nil { return errors.New("raft challenge is nil") } @@ -663,7 +665,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, return errors.New("raft is already initialized") } - plaintext, err := sealAccess.Decrypt(ctx, challenge) + plaintext, err := sealAccess.Decrypt(ctx, raftInfo.challenge) if err != nil { return errwrap.Wrapf("error decrypting challenge: {{err}}", err) } @@ -683,16 +685,17 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, } } - answerReq := leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer") + answerReq := raftInfo.leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer") if err := answerReq.SetJSONBody(map[string]interface{}{ "answer": base64.StdEncoding.EncodeToString(plaintext), "cluster_addr": clusterAddr, "server_id": raftStorage.NodeID(), + "non_voter": raftInfo.nonVoter, }); err != nil { return err } - answerRespJson, err := leaderClient.RawRequestWithContext(ctx, answerReq) + answerRespJson, err := raftInfo.leaderClient.RawRequestWithContext(ctx, answerReq) if answerRespJson != nil { defer answerRespJson.Body.Close() } @@ -725,7 +728,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, } func (c *Core) isRaftUnseal() bool { - return c.raftUnseal + return c.raftInfo != nil } type answerRespData struct {