From d177fc7dd1aa7beda8b1c5bf5a8a4e85d725cca5 Mon Sep 17 00:00:00 2001 From: ncabatoff Date: Fri, 26 Jul 2019 16:42:51 -0400 Subject: [PATCH] Generalize and improve testcluster-building code (#7177) There are a few different things happening in this change. First, some code that previously lived in enterprise has moved here: this includes some helper code for manipulating clusters and for building storage backends. Second, the existing cluster-building code using inmem storage has been generalized to allow various storage backends. Third, added support for creating two-cluster DR setups. Finally, there are tweaks to handle edge cases that result in intermittent failures, or to eliminate sleeps in favour of polling to detect state changes. Also: generalize TestClusterOptions.PhysicalFactory so it can be used either as a per-core factory (for raft) or a per-cluster factory (for other storage backends.) --- helper/testhelpers/testhelpers.go | 623 +++++++++++++++++-------- vault/external_tests/raft/raft_test.go | 296 ++---------- vault/testing.go | 72 ++- 3 files changed, 518 insertions(+), 473 deletions(-) diff --git a/helper/testhelpers/testhelpers.go b/helper/testhelpers/testhelpers.go index 01b6dfa915..d636a68c50 100644 --- a/helper/testhelpers/testhelpers.go +++ b/helper/testhelpers/testhelpers.go @@ -3,6 +3,7 @@ package testhelpers import ( "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -14,28 +15,30 @@ import ( "strings" "sync" "sync/atomic" + realtesting "testing" "time" - "github.com/hashicorp/go-uuid" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/vault/physical/raft" - "github.com/hashicorp/vault/sdk/helper/logging" - "github.com/hashicorp/vault/sdk/logical" - "github.com/hashicorp/vault/vault/cluster" - - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-uuid" raftlib "github.com/hashicorp/raft" "github.com/hashicorp/vault/api" credAppRole "github.com/hashicorp/vault/builtin/credential/approle" + "github.com/hashicorp/vault/builtin/credential/ldap" credUserpass "github.com/hashicorp/vault/builtin/credential/userpass" "github.com/hashicorp/vault/helper/namespace" + "github.com/hashicorp/vault/helper/testhelpers/consul" "github.com/hashicorp/vault/helper/xor" + physConsul "github.com/hashicorp/vault/physical/consul" + "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/consts" + "github.com/hashicorp/vault/sdk/helper/logging" + "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/physical" + physFile "github.com/hashicorp/vault/sdk/physical/file" "github.com/hashicorp/vault/sdk/physical/inmem" "github.com/hashicorp/vault/vault" - testing "github.com/mitchellh/go-testing-interface" + "github.com/hashicorp/vault/vault/cluster" + "github.com/mitchellh/go-testing-interface" ) type ReplicatedTestClusters struct { @@ -176,11 +179,17 @@ func EnsureCoresUnsealed(t testing.T, c *vault.TestCluster) { EnsureCoreUnsealed(t, c, core) } } + func EnsureCoreUnsealed(t testing.T, c *vault.TestCluster, core *vault.TestClusterCore) { if !core.Sealed() { return } + core.SealAccess().ClearCaches(context.Background()) + if err := core.UnsealWithStoredKeys(context.Background()); err != nil { + t.Fatal(err) + } + client := core.Client client.Sys().ResetUnsealProcess() for j := 0; j < len(c.BarrierKeys); j++ { @@ -207,7 +216,7 @@ func EnsureCoreUnsealed(t testing.T, c *vault.TestCluster, core *vault.TestClust func EnsureCoreIsPerfStandby(t testing.T, client *api.Client) { t.Helper() - logger := logging.NewVaultLogger(log.Info).Named(t.Name()) + logger := logging.NewVaultLogger(hclog.Info).Named(t.Name()) start := time.Now() for { health, err := client.Sys().Health() @@ -262,215 +271,239 @@ func PassthroughWithLocalPathsFactory(ctx context.Context, c *logical.BackendCon } func ConfClusterAndCore(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) (*vault.TestCluster, *vault.TestClusterCore) { - if conf.Physical != nil || conf.HAPhysical != nil { - t.Fatalf("conf.Physical and conf.HAPhysical cannot be specified") - } - if opts.Logger == nil { - t.Fatalf("opts.Logger must be specified") + { + var coreConfig vault.CoreConfig + if conf != nil { + coreConfig = *conf + } + conf = &coreConfig } - inm, err := inmem.NewTransactionalInmem(nil, opts.Logger) - if err != nil { - t.Fatal(err) - } - inmha, err := inmem.NewInmemHA(nil, opts.Logger) - if err != nil { - t.Fatal(err) - } - - coreConfig := *conf - coreConfig.Physical = inm - coreConfig.HAPhysical = inmha.(physical.HABackend) - coreConfig.CredentialBackends = map[string]logical.Factory{ + conf.CredentialBackends = map[string]logical.Factory{ "approle": credAppRole.Factory, "userpass": credUserpass.Factory, + "ldap": ldap.Factory, } - vault.AddNoopAudit(&coreConfig) - cluster := vault.NewTestCluster(t, &coreConfig, opts) + + opts = getClusterDefaultsOpts(t, opts, "") + + vault.AddNoopAudit(conf) + + cluster := vault.NewTestCluster(t, conf, opts) cluster.Start() - - cores := cluster.Cores - core := cores[0] - - vault.TestWaitActive(t, core.Core) - - return cluster, core + vault.TestWaitActive(t, cluster.Cores[0].Core) + return cluster, cluster.Cores[0] } +// GetPerfReplicatedClusters returns a ReplicatedTestClusters containing both +// a perf primary and a pref secondary cluster, with replication enabled. func GetPerfReplicatedClusters(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters { - ret := &ReplicatedTestClusters{} + rc := PrepPerfReplicatedClusters(t, conf, opts) + rc.SetupTwoClusterPerfReplication(t, false) + return rc +} - var logger hclog.Logger - if opts != nil { - logger = opts.Logger +// getClusterDefaultsOpts returns a non-nil TestClusterOptions, based on opts +// if it is non-nil. The Logger option will be populated. If name is given, +// the logger will be created using the Named logger method, such that the string +// will appear as part of every log entry. +func getClusterDefaultsOpts(t testing.T, opts *vault.TestClusterOptions, name string) *vault.TestClusterOptions { + if opts == nil { + opts = &vault.TestClusterOptions{} } - if logger == nil { - logger = log.New(&log.LoggerOptions{ - Mutex: &sync.Mutex{}, - Level: log.Trace, - }) + + localOpts := *opts + opts = &localOpts + + if opts.Logger == nil { + opts.Logger = logging.NewVaultLogger(hclog.Trace).Named(t.Name()) } + if name != "" { + opts.Logger = opts.Logger.Named(name) + } + if opts.PhysicalFactory == nil { + opts.PhysicalFactory = sharedPhysicalFactory(MakeInmemBackend) + } + + return opts +} + +// GetPerfPrimaryCluster returns a ReplicatedTestClusters containing only a +// single cluster. Normally you would use NewTestCluster directly, but this +// helper may make sense if you want to test cluster replication but first do +// something with a standalone cluster. +func GetPerfPrimaryCluster(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters { + opts = getClusterDefaultsOpts(t, opts, "") + ret := &ReplicatedTestClusters{} // Set this lower so that state populates quickly to standby nodes cluster.HeartbeatInterval = 2 * time.Second - numCores := opts.NumCores - if numCores == 0 { - numCores = vault.DefaultNumCores - } - - localopts := *opts - localopts.Logger = logger.Named("perf-pri") - ret.PerfPrimaryCluster, _ = ConfClusterAndCore(t, conf, &localopts) - - localopts.Logger = logger.Named("perf-sec") - localopts.FirstCoreNumber += numCores - ret.PerfSecondaryCluster, _ = ConfClusterAndCore(t, conf, &localopts) - - SetupTwoClusterPerfReplication(t, ret.PerfPrimaryCluster, ret.PerfSecondaryCluster) - + ret.PerfPrimaryCluster, _ = ConfClusterAndCore(t, conf, getClusterDefaultsOpts(t, opts, "perf-pri")) return ret } +// AddPerfSecondaryCluster spins up a Perf Secondary cluster and adds it to +// the receiver. Replication is not enabled. +func (r *ReplicatedTestClusters) AddPerfSecondaryCluster(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + if r.PerfSecondaryCluster != nil { + t.Fatal("adding a perf secondary cluster when one is already present") + } + opts = getClusterDefaultsOpts(t, opts, "perf-sec") + opts.FirstCoreNumber += len(r.PerfPrimaryCluster.Cores) + r.PerfSecondaryCluster, _ = ConfClusterAndCore(t, conf, opts) +} + +// PrepPerfReplicatedClusters returns a ReplicatedTestClusters containing both +// a perf primary and a pref secondary cluster. Replication is not enabled. +func PrepPerfReplicatedClusters(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters { + ret := GetPerfPrimaryCluster(t, conf, opts) + ret.AddPerfSecondaryCluster(t, conf, opts) + return ret +} + +// GetFourReplicatedClusters returns an inmem ReplicatedTestClusters with all +// clusters populated and replication enabled. func GetFourReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerProperties) http.Handler) *ReplicatedTestClusters { return GetFourReplicatedClustersWithConf(t, &vault.CoreConfig{}, &vault.TestClusterOptions{ HandlerFunc: handlerFunc, }) } +// GetFourReplicatedClustersWithConf returns a ReplicatedTestClusters with all +// clusters populated and replication enabled. func GetFourReplicatedClustersWithConf(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters { ret := &ReplicatedTestClusters{} - logger := log.New(&log.LoggerOptions{ - Mutex: &sync.Mutex{}, - Level: log.Trace, - }) + opts = getClusterDefaultsOpts(t, opts, "") // Set this lower so that state populates quickly to standby nodes cluster.HeartbeatInterval = 2 * time.Second - numCores := opts.NumCores - if numCores == 0 { - numCores = vault.DefaultNumCores - } - localopts := *opts - localopts.Logger = logger.Named("perf-pri") + localopts.Logger = opts.Logger.Named("perf-pri") ret.PerfPrimaryCluster, _ = ConfClusterAndCore(t, conf, &localopts) - localopts.Logger = logger.Named("perf-sec") - localopts.FirstCoreNumber += numCores + localopts.Logger = opts.Logger.Named("perf-sec") + localopts.FirstCoreNumber += len(ret.PerfPrimaryCluster.Cores) ret.PerfSecondaryCluster, _ = ConfClusterAndCore(t, conf, &localopts) - localopts.Logger = logger.Named("perf-pri-dr") - localopts.FirstCoreNumber += numCores + localopts.Logger = opts.Logger.Named("perf-pri-dr") + localopts.FirstCoreNumber += len(ret.PerfSecondaryCluster.Cores) ret.PerfPrimaryDRCluster, _ = ConfClusterAndCore(t, conf, &localopts) - localopts.Logger = logger.Named("perf-sec-dr") - localopts.FirstCoreNumber += numCores + localopts.Logger = opts.Logger.Named("perf-sec-dr") + localopts.FirstCoreNumber += len(ret.PerfPrimaryDRCluster.Cores) ret.PerfSecondaryDRCluster, _ = ConfClusterAndCore(t, conf, &localopts) - builder := &ReplicatedTestClustersBuilder{clusters: ret} - builder.setupFourClusterReplication(t) - - // Wait until poison pills have been read - time.Sleep(45 * time.Second) - EnsureCoresUnsealed(t, ret.PerfPrimaryCluster) - EnsureCoresUnsealed(t, ret.PerfSecondaryCluster) - EnsureCoresUnsealed(t, ret.PerfPrimaryDRCluster) - EnsureCoresUnsealed(t, ret.PerfSecondaryDRCluster) + SetupFourClusterReplication(t, ret.PerfPrimaryCluster, ret.PerfSecondaryCluster, ret.PerfPrimaryDRCluster, ret.PerfSecondaryDRCluster) return ret } -type ReplicatedTestClustersBuilder struct { - clusters *ReplicatedTestClusters - perfToken string - drToken string - perfSecondaryRootToken string - perfSecondaryDRToken string +func (r *ReplicatedTestClusters) SetupTwoClusterPerfReplication(t testing.T, maskSecondaryToken bool) { + SetupTwoClusterPerfReplication(t, r.PerfPrimaryCluster, r.PerfSecondaryCluster, maskSecondaryToken) } -func SetupTwoClusterPerfReplication(t testing.T, pri, sec *vault.TestCluster) { - clusters := &ReplicatedTestClusters{ - PerfPrimaryCluster: pri, - PerfSecondaryCluster: sec, +func SetupTwoClusterPerfReplication(t testing.T, pri, sec *vault.TestCluster, maskSecondaryToken bool) { + EnablePerfPrimary(t, pri) + + var publicKey string + if maskSecondaryToken { + publicKey = generatePublicKey(t, sec) } - builder := &ReplicatedTestClustersBuilder{clusters: clusters} - builder.setupTwoClusterReplication(t) + perfToken := GetPerformanceToken(t, pri, sec.ID, publicKey) + + EnablePerformanceSecondary(t, perfToken, pri, sec, false, false) } -func (r *ReplicatedTestClustersBuilder) setupTwoClusterReplication(t testing.T) { - t.Log("enabling perf primary") - r.enablePerfPrimary(t) - WaitForActiveNode(t, r.clusters.PerfPrimaryCluster) - r.getPerformanceToken(t) - t.Log("enabling perf secondary") - r.enablePerformanceSecondary(t) +func GetDRReplicatedClusters(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters { + clusters := PrepDRReplicatedClusters(t, conf, opts) + SetupTwoClusterDRReplication(t, clusters.PerfPrimaryCluster, clusters.PerfPrimaryDRCluster, false) + return clusters +} + +func (r *ReplicatedTestClusters) AddDRSecondaryCluster(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + opts = getClusterDefaultsOpts(t, opts, "perf-dr-pri") + opts.FirstCoreNumber += len(r.PerfPrimaryCluster.Cores) + r.PerfPrimaryDRCluster, _ = ConfClusterAndCore(t, conf, opts) +} + +func PrepDRReplicatedClusters(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters { + ret := GetPerfPrimaryCluster(t, conf, opts) + ret.AddDRSecondaryCluster(t, conf, opts) + return ret +} + +func SetupTwoClusterDRReplication(t testing.T, pri, sec *vault.TestCluster, maskSecondaryToken bool) { + EnableDrPrimary(t, pri) + setupDRReplication(t, pri, sec, maskSecondaryToken) +} + +func setupDRReplication(t testing.T, pri, sec *vault.TestCluster, maskSecondaryToken bool) { + var publicKey string + if maskSecondaryToken { + publicKey = generatePublicKey(t, sec) + } + drToken := getDrToken(t, pri, sec.ID, publicKey) + + EnableDrSecondary(t, pri, sec, drToken) + for _, core := range sec.Cores { + core.Client.SetToken(pri.Cores[0].Client.Token()) + } + WaitForActiveNode(t, sec) + WaitForMatchingMerkleRoots(t, "sys/replication/dr/", pri.Cores[0].Client, sec.Cores[0].Client) + WaitForDRReplicationWorking(t, pri, sec) } func SetupFourClusterReplication(t testing.T, pri, sec, pridr, secdr *vault.TestCluster) { - clusters := &ReplicatedTestClusters{ - PerfPrimaryCluster: pri, - PerfSecondaryCluster: sec, - PerfPrimaryDRCluster: pridr, - PerfSecondaryDRCluster: secdr, - } - builder := &ReplicatedTestClustersBuilder{clusters: clusters} - builder.setupFourClusterReplication(t) + SetupTwoClusterPerfReplication(t, pri, sec, false) + SetupTwoClusterDRReplication(t, pri, pridr, false) + SetupTwoClusterDRReplication(t, sec, secdr, false) } -func (r *ReplicatedTestClustersBuilder) setupFourClusterReplication(t testing.T) { - t.Log("enabling perf primary") - r.enablePerfPrimary(t) - r.getPerformanceToken(t) - - t.Log("enabling dr primary") - enableDrPrimary(t, r.clusters.PerfPrimaryCluster) - r.drToken = getDrToken(t, r.clusters.PerfPrimaryCluster, "primary-dr-secondary") - WaitForActiveNode(t, r.clusters.PerfPrimaryCluster) - time.Sleep(1 * time.Second) - - t.Log("enabling perf secondary") - r.enablePerformanceSecondary(t) - enableDrPrimary(t, r.clusters.PerfSecondaryCluster) - r.perfSecondaryDRToken = getDrToken(t, r.clusters.PerfSecondaryCluster, "secondary-dr-secondary") - - t.Log("enabling dr secondary on primary dr cluster") - r.enableDrSecondary(t, r.clusters.PerfPrimaryDRCluster, r.drToken, r.clusters.PerfPrimaryCluster.CACertPEMFile) - r.clusters.PerfPrimaryDRCluster.Cores[0].Client.SetToken(r.clusters.PerfPrimaryCluster.Cores[0].Client.Token()) - WaitForActiveNode(t, r.clusters.PerfPrimaryDRCluster) - time.Sleep(1 * time.Second) - - t.Log("enabling dr secondary on secondary dr cluster") - r.enableDrSecondary(t, r.clusters.PerfSecondaryDRCluster, r.perfSecondaryDRToken, r.clusters.PerfSecondaryCluster.CACertPEMFile) - r.clusters.PerfSecondaryDRCluster.Cores[0].Client.SetToken(r.perfSecondaryRootToken) - WaitForActiveNode(t, r.clusters.PerfSecondaryDRCluster) -} - -func (r *ReplicatedTestClustersBuilder) enablePerfPrimary(t testing.T) { - c := r.clusters.PerfPrimaryCluster.Cores[0] +func EnablePerfPrimary(t testing.T, cluster *vault.TestCluster) { + cluster.Logger.Info("enabling performance primary") + c := cluster.Cores[0] _, err := c.Client.Logical().Write("sys/replication/performance/primary/enable", nil) if err != nil { t.Fatal(err) } WaitForReplicationState(t, c.Core, consts.ReplicationPerformancePrimary) - WaitForActiveNodeAndPerfStandbys(t, r.clusters.PerfPrimaryCluster) + WaitForActiveNodeAndPerfStandbys(t, cluster) + cluster.Logger.Info("enabled performance primary") } -func (r *ReplicatedTestClustersBuilder) getPerformanceToken(t testing.T) { - client := r.clusters.PerfPrimaryCluster.Cores[0].Client +func generatePublicKey(t testing.T, cluster *vault.TestCluster) string { + generateKeyPath := "sys/replication/performance/secondary/generate-public-key" + secret, err := cluster.Cores[0].Client.Logical().Write(generateKeyPath, nil) + if err != nil { + t.Fatal(err) + } + if secret == nil || secret.Data == nil { + t.Fatal("secret or secret data is nil") + } + + return secret.Data["secondary_public_key"].(string) +} + +func GetPerformanceToken(t testing.T, pri *vault.TestCluster, id, secondaryPublicKey string) string { + client := pri.Cores[0].Client req := map[string]interface{}{ - "id": r.clusters.PerfSecondaryCluster.ID, + "id": id, + } + if secondaryPublicKey != "" { + req["secondary_public_key"] = secondaryPublicKey } secret, err := client.Logical().Write("sys/replication/performance/primary/secondary-token", req) if err != nil { t.Fatal(err) } - r.perfToken = secret.WrapInfo.Token + return secret.WrapInfo.Token } -func enableDrPrimary(t testing.T, tc *vault.TestCluster) { +func EnableDrPrimary(t testing.T, tc *vault.TestCluster) { + tc.Logger.Info("enabling dr primary") c := tc.Cores[0] _, err := c.Client.Logical().Write("sys/replication/dr/primary/enable", nil) if err != nil { @@ -480,12 +513,16 @@ func enableDrPrimary(t testing.T, tc *vault.TestCluster) { WaitForReplicationStatus(t, c.Client, true, func(secret map[string]interface{}) bool { return secret["mode"] != nil && secret["mode"] == "primary" }) + tc.Logger.Info("enabled dr primary") } -func getDrToken(t testing.T, tc *vault.TestCluster, id string) string { +func getDrToken(t testing.T, tc *vault.TestCluster, id, secondaryPublicKey string) string { req := map[string]interface{}{ "id": id, } + if secondaryPublicKey != "" { + req["secondary_public_key"] = secondaryPublicKey + } secret, err := tc.Cores[0].Client.Logical().Write("sys/replication/dr/primary/secondary-token", req) if err != nil { t.Fatal(err) @@ -493,57 +530,89 @@ func getDrToken(t testing.T, tc *vault.TestCluster, id string) string { return secret.WrapInfo.Token } -func (r *ReplicatedTestClustersBuilder) enablePerformanceSecondary(t testing.T) { - c := r.clusters.PerfSecondaryCluster.Cores[0] +func EnablePerformanceSecondary(t testing.T, perfToken string, pri, sec *vault.TestCluster, updatePrimary, skipPoisonPill bool) string { postData := map[string]interface{}{ - "token": r.perfToken, - "ca_file": r.clusters.PerfPrimaryCluster.CACertPEMFile, + "token": perfToken, + "ca_file": pri.CACertPEMFile, } - if r.clusters.PerfPrimaryCluster.ClientAuthRequired { - p := r.clusters.PerfPrimaryCluster.Cores[0] + if pri.ClientAuthRequired { + p := pri.Cores[0] postData["client_cert_pem"] = string(p.ServerCertPEM) postData["client_key_pem"] = string(p.ServerKeyPEM) } - _, err := c.Client.Logical().Write("sys/replication/performance/secondary/enable", postData) + path := "sys/replication/performance/secondary/enable" + if updatePrimary { + path = "sys/replication/performance/secondary/update-primary" + } + _, err := sec.Cores[0].Client.Logical().Write(path, postData) if err != nil { t.Fatal(err) } - WaitForReplicationState(t, c.Core, consts.ReplicationPerformanceSecondary) + sec.Logger.Info("enabled perf secondary, waiting for its replication state") + WaitForReplicationState(t, sec.Cores[0].Core, consts.ReplicationPerformanceSecondary) + WaitForMatchingMerkleRootsCore(t, pri.Cores[0], sec.Cores[0], false) - r.clusters.PerfSecondaryCluster.BarrierKeys = r.clusters.PerfPrimaryCluster.BarrierKeys + var perfSecondaryRootToken string + if !updatePrimary { + sec.BarrierKeys = pri.BarrierKeys + if !pri.Cores[0].SealAccess().RecoveryKeySupported() { + sec.RecoveryKeys = pri.BarrierKeys + } else { + sec.RecoveryKeys = pri.RecoveryKeys + } - // We want to make sure we unseal all the nodes so we first need to wait - // until two of the nodes seal due to the poison pill being written - WaitForNCoresSealed(t, r.clusters.PerfSecondaryCluster, 2) - EnsureCoresUnsealed(t, r.clusters.PerfSecondaryCluster) - WaitForActiveNode(t, r.clusters.PerfSecondaryCluster) + if len(sec.Cores) > 1 { + if skipPoisonPill { + // As part of prepareSecondary on the active node the keyring is + // deleted from storage. Its absence can cause standbys to seal + // themselves. But it's not reliable, so we'll seal them + // ourselves to force the issue. + for _, core := range sec.Cores[1:] { + EnsureCoreSealed(t, core) + } + } else { + sec.Logger.Info("waiting for perf secondary standbys to seal") + // We want to make sure we unseal all the nodes so we first need to wait + // until two of the nodes seal due to the poison pill being written + WaitForNCoresSealed(t, sec, len(sec.Cores)-1) + } + } + sec.Logger.Info("waiting for perf secondary standbys to be unsealed") + EnsureCoresUnsealed(t, sec) + sec.Logger.Info("waiting for perf secondary active node") + WaitForActiveNode(t, sec) + sec.Logger.Info("generating new perf secondary root") - r.perfSecondaryRootToken = GenerateRoot(t, r.clusters.PerfSecondaryCluster, false) - for _, core := range r.clusters.PerfSecondaryCluster.Cores { - core.Client.SetToken(r.perfSecondaryRootToken) + perfSecondaryRootToken = GenerateRoot(t, sec, false) + for _, core := range sec.Cores { + core.Client.SetToken(perfSecondaryRootToken) + } + WaitForActiveNodeAndPerfStandbys(t, sec) } - WaitForPerfReplicationWorking(t, r.clusters.PerfPrimaryCluster, r.clusters.PerfSecondaryCluster) + WaitForPerfReplicationWorking(t, pri, sec) + return perfSecondaryRootToken } -func (r *ReplicatedTestClustersBuilder) enableDrSecondary(t testing.T, tc *vault.TestCluster, token, ca_file string) { - _, err := tc.Cores[0].Client.Logical().Write("sys/replication/dr/secondary/enable", map[string]interface{}{ +func EnableDrSecondary(t testing.T, pri, sec *vault.TestCluster, token string) { + sec.Logger.Info("enabling dr secondary") + _, err := sec.Cores[0].Client.Logical().Write("sys/replication/dr/secondary/enable", map[string]interface{}{ "token": token, - "ca_file": ca_file, + "ca_file": pri.CACertPEMFile, }) if err != nil { t.Fatal(err) } - WaitForReplicationState(t, tc.Cores[0].Core, consts.ReplicationDRSecondary) - tc.BarrierKeys = r.clusters.PerfPrimaryCluster.BarrierKeys + WaitForReplicationState(t, sec.Cores[0].Core, consts.ReplicationDRSecondary) + sec.BarrierKeys = pri.BarrierKeys // We want to make sure we unseal all the nodes so we first need to wait // until two of the nodes seal due to the poison pill being written - WaitForNCoresSealed(t, tc, len(tc.Cores)-1) - EnsureCoresUnsealed(t, tc) - WaitForReplicationStatus(t, tc.Cores[0].Client, true, func(secret map[string]interface{}) bool { + WaitForNCoresSealed(t, sec, len(sec.Cores)-1) + EnsureCoresUnsealed(t, sec) + WaitForReplicationStatus(t, sec.Cores[0].Client, true, func(secret map[string]interface{}) bool { return secret["mode"] != nil && secret["mode"] == "secondary" }) } @@ -616,7 +685,7 @@ func WaitForNCoresUnsealed(t testing.T, cluster *vault.TestCluster, n int) { func WaitForNCoresSealed(t testing.T, cluster *vault.TestCluster, n int) { t.Helper() - for i := 0; i < 30; i++ { + for i := 0; i < 60; i++ { sealed := 0 for _, core := range cluster.Cores { if core.Core.Sealed() { @@ -635,13 +704,20 @@ func WaitForNCoresSealed(t testing.T, cluster *vault.TestCluster, n int) { func WaitForActiveNodeAndPerfStandbys(t testing.T, cluster *vault.TestCluster) { t.Helper() + + expectedStandbys := 0 + for _, core := range cluster.Cores[1:] { + if !core.CoreConfig.DisablePerformanceStandby { + expectedStandbys++ + } + } mountPoint, err := uuid.GenerateUUID() if err != nil { t.Fatal(err) } err = cluster.Cores[0].Client.Sys().Mount(mountPoint, &api.MountInput{ Type: "kv", - Local: false, + Local: true, }) if err != nil { t.Fatal("unable to mount KV engine") @@ -652,40 +728,48 @@ func WaitForActiveNodeAndPerfStandbys(t testing.T, cluster *vault.TestCluster) { deadline := time.Now().Add(30 * time.Second) for _, c := range cluster.Cores { wg.Add(1) - go func(client *api.Client) { + go func(core *vault.TestClusterCore) { defer wg.Done() val := 1 for time.Now().Before(deadline) { - _, err = cluster.Cores[0].Client.Logical().Write(path, map[string]interface{}{ + _, err := cluster.Cores[0].Client.Logical().Write(path, map[string]interface{}{ "bar": val, }) - if err != nil { - t.Fatal("unable to write KV", "path", path) - } val++ time.Sleep(250 * time.Millisecond) - leader, err := client.Sys().Leader() + if err != nil { + if strings.Contains(err.Error(), "Vault is sealed") { + continue + } + if strings.Contains(err.Error(), "still catching up to primary") { + continue + } + t.Fatal(err) + } + leader, err := core.Client.Sys().Leader() if err != nil { if strings.Contains(err.Error(), "Vault is sealed") { continue } t.Fatal(err) } - if leader.IsSelf { + switch { + case leader.IsSelf: atomic.AddInt64(&actives, 1) return - } - if leader.PerfStandby && leader.PerfStandbyLastRemoteWAL > 0 { + case leader.LeaderAddress != "" && core.CoreConfig.DisablePerformanceStandby: + return + case leader.PerfStandby && leader.PerfStandbyLastRemoteWAL > 0: atomic.AddInt64(&standbys, 1) return } } - }(c.Client) + }(c) } wg.Wait() - if actives != 1 || int(standbys) != len(cluster.Cores)-1 { + if actives != 1 || int(standbys) != expectedStandbys { t.Fatalf("expected 1 active core and %d standbys, got %d active and %d standbys", - len(cluster.Cores)-1, actives, standbys) + expectedStandbys, actives, standbys) } err = cluster.Cores[0].Client.Sys().Unmount(mountPoint) if err != nil { @@ -833,7 +917,8 @@ func RekeyCluster(t testing.T, cluster *vault.TestCluster) { cluster.BarrierKeys = newBarrierKeys } -func CreateRaftBackend(t testing.T, logger hclog.Logger, nodeID string) (physical.Backend, func(), error) { +func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + nodeID := fmt.Sprintf("core-%d", coreIdx) raftDir, err := ioutil.TempDir("", "vault-raft-") if err != nil { t.Fatal(err) @@ -857,7 +942,11 @@ func CreateRaftBackend(t testing.T, logger hclog.Logger, nodeID string) (physica t.Fatal(err) } - return backend, cleanupFunc, nil + return &vault.PhysicalBackendBundle{ + Backend: backend, + HABackend: backend.(physical.HABackend), + Cleanup: cleanupFunc, + } } type TestRaftServerAddressProvider struct { @@ -960,3 +1049,153 @@ func WaitForPerfReplicationWorking(t testing.T, pri, sec *vault.TestCluster) { } t.Fatal("unable to read replicated KV on secondary", "path", path, "err", err) } + +func WaitForDRReplicationWorking(t testing.T, pri, sec *vault.TestCluster) { + priClient, secClient := pri.Cores[0].Client, sec.Cores[0].Client + mountPoint, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + err = priClient.Sys().Mount(mountPoint, &api.MountInput{ + Type: "kv", + Local: false, + }) + if err != nil { + t.Fatal("unable to mount KV engine on primary") + } + + path := mountPoint + "/foo" + _, err = priClient.Logical().Write(path, map[string]interface{}{ + "bar": 1, + }) + if err != nil { + t.Fatal("unable to write KV on primary", "path", path) + } + + WaitForReplicationStatus(t, secClient, true, func(secret map[string]interface{}) bool { + if secret["last_remote_wal"] != nil { + lastRemoteWal, _ := secret["last_remote_wal"].(json.Number).Int64() + return lastRemoteWal > 0 + } + + return false + }) + + err = priClient.Sys().Unmount(mountPoint) + if err != nil { + t.Fatal("unable to unmount KV engine on primary") + } +} + +func MakeInmemBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle { + inm, err := inmem.NewTransactionalInmem(nil, logger) + if err != nil { + t.Fatal(err) + } + inmha, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + + return &vault.PhysicalBackendBundle{ + Backend: inm, + HABackend: inmha.(physical.HABackend), + } +} + +func MakeInmemNonTransactionalBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle { + inm, err := inmem.NewInmem(nil, logger) + if err != nil { + t.Fatal(err) + } + inmha, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + + return &vault.PhysicalBackendBundle{ + Backend: inm, + HABackend: inmha.(physical.HABackend), + } +} + +func MakeFileBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle { + path, err := ioutil.TempDir("", "vault-integ-file-") + if err != nil { + t.Fatal(err) + } + fileConf := map[string]string{ + "path": path, + } + fileBackend, err := physFile.NewTransactionalFileBackend(fileConf, logger) + if err != nil { + t.Fatal(err) + } + + inmha, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + + return &vault.PhysicalBackendBundle{ + Backend: fileBackend, + HABackend: inmha.(physical.HABackend), + Cleanup: func() { + err := os.RemoveAll(path) + if err != nil { + t.Fatal(err) + } + }, + } +} + +func MakeConsulBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle { + cleanup, consulAddress, consulToken := consul.PrepareTestContainer(t.(*realtesting.T), "1.4.0-rc1") + consulConf := map[string]string{ + "address": consulAddress, + "token": consulToken, + "max_parallel": "32", + } + consulBackend, err := physConsul.NewConsulBackend(consulConf, logger) + if err != nil { + t.Fatal(err) + } + return &vault.PhysicalBackendBundle{ + Backend: consulBackend, + Cleanup: cleanup, + } +} + +type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) + +func sharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + if coreIdx == 0 { + return f(t, logger) + } + return nil + } +} + +func InmemBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + opts.PhysicalFactory = sharedPhysicalFactory(MakeInmemBackend) +} +func InmemNonTransactionalBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + opts.PhysicalFactory = sharedPhysicalFactory(MakeInmemNonTransactionalBackend) +} +func FileBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + opts.PhysicalFactory = sharedPhysicalFactory(MakeFileBackend) +} +func ConsulBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + opts.PhysicalFactory = sharedPhysicalFactory(MakeConsulBackend) +} + +func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + conf.DisablePerformanceStandby = true + opts.KeepStandbysSealed = true + opts.PhysicalFactory = MakeRaftBackend + opts.SetupFunc = func(t testing.T, c *vault.TestCluster) { + RaftClusterJoinNodes(t, c) + time.Sleep(15 * time.Second) + } +} diff --git a/vault/external_tests/raft/raft_test.go b/vault/external_tests/raft/raft_test.go index 8c4d0ad71a..a0d921e9f3 100644 --- a/vault/external_tests/raft/raft_test.go +++ b/vault/external_tests/raft/raft_test.go @@ -6,50 +6,35 @@ import ( "io/ioutil" "net/http" "strings" - "sync" "sync/atomic" "testing" "time" - cleanhttp "github.com/hashicorp/go-cleanhttp" - hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/vault/api" "github.com/hashicorp/vault/helper/testhelpers" vaulthttp "github.com/hashicorp/vault/http" "github.com/hashicorp/vault/physical/raft" - "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/vault" "golang.org/x/net/http2" ) +func raftCluster(t *testing.T) *vault.TestCluster { + var conf vault.CoreConfig + var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler} + testhelpers.RaftBackendSetup(&conf, &opts) + cluster := vault.NewTestCluster(t, &conf, &opts) + cluster.Start() + vault.TestWaitActive(t, cluster.Cores[0].Core) + return cluster +} + func TestRaft_Join(t *testing.T) { - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() + var conf vault.CoreConfig + var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler} + testhelpers.RaftBackendSetup(&conf, &opts) + opts.SetupFunc = nil + cluster := vault.NewTestCluster(t, &conf, &opts) cluster.Start() defer cluster.Cleanup() @@ -107,38 +92,9 @@ func TestRaft_Join(t *testing.T) { } func TestRaft_RemovePeer(t *testing.T) { - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() + cluster := raftCluster(t) defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) - for i, c := range cluster.Cores { if c.Core.Sealed() { t.Fatalf("failed to unseal core %d", i) @@ -194,38 +150,9 @@ func TestRaft_RemovePeer(t *testing.T) { } func TestRaft_Configuration(t *testing.T) { - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() + cluster := raftCluster(t) defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) - for i, c := range cluster.Cores { if c.Core.Sealed() { t.Fatalf("failed to unseal core %d", i) @@ -269,38 +196,9 @@ func TestRaft_Configuration(t *testing.T) { } func TestRaft_ShamirUnseal(t *testing.T) { - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() + cluster := raftCluster(t) defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) - for i, c := range cluster.Cores { if c.Core.Sealed() { t.Fatalf("failed to unseal core %d", i) @@ -309,38 +207,9 @@ func TestRaft_ShamirUnseal(t *testing.T) { } func TestRaft_SnapshotAPI(t *testing.T) { - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() + cluster := raftCluster(t) defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) - leaderClient := cluster.Cores[0].Client // Write a few keys @@ -443,38 +312,9 @@ func TestRaft_SnapshotAPI_RekeyRotate_Backward(t *testing.T) { // bind locally tCaseLocal := tCase t.Parallel() - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - Name: tCaseLocal.Name, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() - defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) + cluster := raftCluster(t) + defer cluster.Cleanup() leaderClient := cluster.Cores[0].Client @@ -630,38 +470,9 @@ func TestRaft_SnapshotAPI_RekeyRotate_Forward(t *testing.T) { // bind locally tCaseLocal := tCase t.Parallel() - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - Name: tCaseLocal.Name, - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() - defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) + cluster := raftCluster(t) + defer cluster.Cleanup() leaderClient := cluster.Cores[0].Client @@ -828,40 +639,9 @@ func TestRaft_SnapshotAPI_RekeyRotate_Forward(t *testing.T) { } func TestRaft_SnapshotAPI_DifferentCluster(t *testing.T) { - - var cleanupFuncs []func() - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - Name: "cluster1", - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - defer func() { - for _, c := range cleanupFuncs { - c() - } - }() - cluster.Start() + cluster := raftCluster(t) defer cluster.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster) - leaderClient := cluster.Cores[0].Client // Write a few keys @@ -905,33 +685,9 @@ func TestRaft_SnapshotAPI_DifferentCluster(t *testing.T) { // Cluster 2 { - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Mutex: &sync.Mutex{}, - Name: "cluster2", - }) - coreConfig := &vault.CoreConfig{ - Logger: logger, - // TODO: remove this later - DisablePerformanceStandby: true, - } - i := 0 - cluster2 := vault.NewTestCluster(t, coreConfig, &vault.TestClusterOptions{ - PhysicalFactory: func(logger hclog.Logger) (physical.Backend, error) { - backend, cleanupFunc, err := testhelpers.CreateRaftBackend(t, logger, fmt.Sprintf("core-%d", i)) - i++ - cleanupFuncs = append(cleanupFuncs, cleanupFunc) - return backend, err - }, - Logger: logger, - KeepStandbysSealed: true, - HandlerFunc: vaulthttp.Handler, - }) - cluster2.Start() + cluster2 := raftCluster(t) defer cluster2.Cleanup() - testhelpers.RaftClusterJoinNodes(t, cluster2) - leaderClient := cluster2.Cores[0].Client transport := cleanhttp.DefaultPooledTransport() diff --git a/vault/testing.go b/vault/testing.go index e614552d1b..653069fe5f 100644 --- a/vault/testing.go +++ b/vault/testing.go @@ -797,6 +797,8 @@ type TestCluster struct { TempDir string ClientAuthRequired bool Logger log.Logger + CleanupFunc func() + SetupFunc func() } func (c *TestCluster) Start() { @@ -807,6 +809,9 @@ func (c *TestCluster) Start() { } } } + if c.SetupFunc != nil { + c.SetupFunc() + } } // UnsealCores uses the cluster barrier keys to unseal the test cluster cores @@ -947,6 +952,9 @@ func (c *TestCluster) Cleanup() { // Give time to actually shut down/clean up before the next test time.Sleep(time.Second) + if c.CleanupFunc != nil { + c.CleanupFunc() + } } func (c *TestCluster) ensureCoresSealed() error { @@ -1018,6 +1026,12 @@ type TestClusterCore struct { NodeID string } +type PhysicalBackendBundle struct { + Backend physical.Backend + HABackend physical.HABackend + Cleanup func() +} + type TestClusterOptions struct { KeepStandbysSealed bool SkipInit bool @@ -1029,9 +1043,18 @@ type TestClusterOptions struct { TempDir string CACert []byte CAKey *ecdsa.PrivateKey - PhysicalFactory func(hclog.Logger) (physical.Backend, error) - FirstCoreNumber int - RequireClientAuth bool + // PhysicalFactory is used to create backends. + // The int argument is the index of the core within the cluster, i.e. first + // core in cluster will have 0, second 1, etc. + // If the backend is shared across the cluster (i.e. is not Raft) then it + // should return nil when coreIdx != 0. + PhysicalFactory func(t testing.T, coreIdx int, logger hclog.Logger) *PhysicalBackendBundle + // FirstCoreNumber is used to assign a unique number to each core within + // a multi-cluster setup. + FirstCoreNumber int + RequireClientAuth bool + // SetupFunc is called after the cluster is started. + SetupFunc func(t testing.T, c *TestCluster) } var DefaultNumCores = 3 @@ -1414,6 +1437,7 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te t.Fatalf("err: %v", err) } + cleanupFuncs := []func(){} cores := []*Core{} coreConfigs := []*CoreConfig{} for i := 0; i < numCores; i++ { @@ -1428,15 +1452,28 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te if coreConfig.Logger == nil || (opts != nil && opts.Logger != nil) { localConfig.Logger = testCluster.Logger.Named(fmt.Sprintf("core%d", i)) } - if opts != nil && opts.PhysicalFactory != nil { - localConfig.Physical, err = opts.PhysicalFactory(localConfig.Logger) - if err != nil { - t.Fatalf("err: %v", err) - } - - if haPhysical, ok := localConfig.Physical.(physical.HABackend); ok { - localConfig.HAPhysical = haPhysical + physBundle := opts.PhysicalFactory(t, i, localConfig.Logger) + switch { + case physBundle == nil && coreConfig.Physical != nil: + case physBundle == nil && coreConfig.Physical == nil: + t.Fatal("PhysicalFactory produced no physical and none in CoreConfig") + case physBundle != nil: + testCluster.Logger.Info("created physical backend", "instance", i) + coreConfig.Physical = physBundle.Backend + localConfig.Physical = physBundle.Backend + base.Physical = physBundle.Backend + haBackend := physBundle.HABackend + if haBackend == nil { + if ha, ok := physBundle.Backend.(physical.HABackend); ok { + haBackend = ha + } + } + coreConfig.HAPhysical = haBackend + localConfig.HAPhysical = haBackend + if physBundle.Cleanup != nil { + cleanupFuncs = append(cleanupFuncs, physBundle.Cleanup) + } } } @@ -1698,6 +1735,19 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te testExtraClusterCoresTestSetup(t, priKey, testCluster.Cores) + testCluster.CleanupFunc = func() { + for _, c := range cleanupFuncs { + c() + } + } + if opts != nil { + if opts.SetupFunc != nil { + testCluster.SetupFunc = func() { + opts.SetupFunc(t, &testCluster) + } + } + } + return &testCluster }