mirror of
https://github.com/hashicorp/vault.git
synced 2025-08-25 08:31:09 +02:00
Add a cleanLeaderPrefix function to clean up stale leader entries in core/leader
Fixes #679.
This commit is contained in:
parent
9299ab2593
commit
f0ce939f66
@ -6,8 +6,9 @@ DEPRECATIONS/BREAKING CHANGES:
|
|||||||
|
|
||||||
IMPROVEMENTS:
|
IMPROVEMENTS:
|
||||||
|
|
||||||
* init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653]
|
|
||||||
* core: Tokens can now renew themselves [GH-455]
|
* core: Tokens can now renew themselves [GH-455]
|
||||||
|
* core: Stale leader entries will now be reaped [GH-679]
|
||||||
|
* init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653]
|
||||||
* logical: Responses now contain a "warnings" key containing a list of warnings returned from the server. These are conditions that did not require failing an operation, but of which the client should be aware. [GH-676]
|
* logical: Responses now contain a "warnings" key containing a list of warnings returned from the server. These are conditions that did not require failing an operation, but of which the client should be aware. [GH-676]
|
||||||
|
|
||||||
BUG FIXES:
|
BUG FIXES:
|
||||||
|
@ -239,6 +239,10 @@ type Core struct {
|
|||||||
// metricsCh is used to stop the metrics streaming
|
// metricsCh is used to stop the metrics streaming
|
||||||
metricsCh chan struct{}
|
metricsCh chan struct{}
|
||||||
|
|
||||||
|
// leaderPrefixCleanupCh is used to stop
|
||||||
|
// the leader prefix cleansing operation
|
||||||
|
leaderPrefixCleanupCh chan struct{}
|
||||||
|
|
||||||
defaultLeaseTTL time.Duration
|
defaultLeaseTTL time.Duration
|
||||||
maxLeaseTTL time.Duration
|
maxLeaseTTL time.Duration
|
||||||
|
|
||||||
@ -1603,6 +1607,8 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st
|
|||||||
|
|
||||||
// advertiseLeader is used to advertise the current node as leader
|
// advertiseLeader is used to advertise the current node as leader
|
||||||
func (c *Core) advertiseLeader(uuid string) error {
|
func (c *Core) advertiseLeader(uuid string) error {
|
||||||
|
c.leaderPrefixCleanupCh = make(chan struct{})
|
||||||
|
go c.cleanLeaderPrefix(uuid)
|
||||||
ent := &Entry{
|
ent := &Entry{
|
||||||
Key: coreLeaderPrefix + uuid,
|
Key: coreLeaderPrefix + uuid,
|
||||||
Value: []byte(c.advertiseAddr),
|
Value: []byte(c.advertiseAddr),
|
||||||
@ -1610,8 +1616,35 @@ func (c *Core) advertiseLeader(uuid string) error {
|
|||||||
return c.barrier.Put(ent)
|
return c.barrier.Put(ent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Core) cleanLeaderPrefix(uuid string) {
|
||||||
|
keys, err := c.barrier.List(coreLeaderPrefix)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Printf("[ERR] core: failed to list entries in core/leader: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(keys) == 0 {
|
||||||
|
c.logger.Print("[ERR] core: found no entries under core/leader (should have found our own)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
if keys[0] != uuid {
|
||||||
|
c.barrier.Delete(coreLeaderPrefix + keys[0])
|
||||||
|
}
|
||||||
|
keys = keys[1:]
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-c.leaderPrefixCleanupCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// clearLeader is used to clear our leadership entry
|
// clearLeader is used to clear our leadership entry
|
||||||
func (c *Core) clearLeader(uuid string) error {
|
func (c *Core) clearLeader(uuid string) error {
|
||||||
|
close(c.leaderPrefixCleanupCh)
|
||||||
key := coreLeaderPrefix + uuid
|
key := coreLeaderPrefix + uuid
|
||||||
return c.barrier.Delete(key)
|
return c.barrier.Delete(key)
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/otto/helper/uuid"
|
||||||
"github.com/hashicorp/vault/audit"
|
"github.com/hashicorp/vault/audit"
|
||||||
"github.com/hashicorp/vault/logical"
|
"github.com/hashicorp/vault/logical"
|
||||||
"github.com/hashicorp/vault/physical"
|
"github.com/hashicorp/vault/physical"
|
||||||
@ -1068,6 +1069,152 @@ func TestCore_LimitedUseToken(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCore_CleanLeaderPrefix(t *testing.T) {
|
||||||
|
// Create the first core and initialize it
|
||||||
|
inm := physical.NewInmemHA()
|
||||||
|
advertiseOriginal := "http://127.0.0.1:8200"
|
||||||
|
core, err := NewCore(&CoreConfig{
|
||||||
|
Physical: inm,
|
||||||
|
AdvertiseAddr: advertiseOriginal,
|
||||||
|
DisableMlock: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
key, root := TestCoreInit(t, core)
|
||||||
|
if _, err := core.Unseal(TestKeyCopy(key)); err != nil {
|
||||||
|
t.Fatalf("unseal err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify unsealed
|
||||||
|
sealed, err := core.Sealed()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err checking seal status: %s", err)
|
||||||
|
}
|
||||||
|
if sealed {
|
||||||
|
t.Fatal("should not be sealed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for core to become active
|
||||||
|
testWaitActive(t, core)
|
||||||
|
|
||||||
|
// Ensure that the original clean function has stopped running
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// Put several random entries
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
core.barrier.Put(&Entry{
|
||||||
|
Key: coreLeaderPrefix + uuid.GenerateUUID(),
|
||||||
|
Value: []byte(uuid.GenerateUUID()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, err := core.barrier.List(coreLeaderPrefix)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(entries) != 6 {
|
||||||
|
t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the leader is local
|
||||||
|
isLeader, advertise, err := core.Leader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !isLeader {
|
||||||
|
t.Fatalf("should be leader")
|
||||||
|
}
|
||||||
|
if advertise != advertiseOriginal {
|
||||||
|
t.Fatalf("Bad advertise: %v", advertise)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a second core, attached to same in-memory store
|
||||||
|
advertiseOriginal2 := "http://127.0.0.1:8500"
|
||||||
|
core2, err := NewCore(&CoreConfig{
|
||||||
|
Physical: inm,
|
||||||
|
AdvertiseAddr: advertiseOriginal2,
|
||||||
|
DisableMlock: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := core2.Unseal(TestKeyCopy(key)); err != nil {
|
||||||
|
t.Fatalf("unseal err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify unsealed
|
||||||
|
sealed, err = core2.Sealed()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err checking seal status: %s", err)
|
||||||
|
}
|
||||||
|
if sealed {
|
||||||
|
t.Fatal("should not be sealed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Core2 should be in standby
|
||||||
|
standby, err := core2.Standby()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !standby {
|
||||||
|
t.Fatalf("should be standby")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the leader is not local
|
||||||
|
isLeader, advertise, err = core2.Leader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if isLeader {
|
||||||
|
t.Fatalf("should not be leader")
|
||||||
|
}
|
||||||
|
if advertise != advertiseOriginal {
|
||||||
|
t.Fatalf("Bad advertise: %v", advertise)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seal the first core, should step down
|
||||||
|
err = core.Seal(root)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Core should be in standby
|
||||||
|
standby, err = core.Standby()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !standby {
|
||||||
|
t.Fatalf("should be standby")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for core2 to become active
|
||||||
|
testWaitActive(t, core2)
|
||||||
|
|
||||||
|
// Check the leader is local
|
||||||
|
isLeader, advertise, err = core2.Leader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !isLeader {
|
||||||
|
t.Fatalf("should be leader")
|
||||||
|
}
|
||||||
|
if advertise != advertiseOriginal2 {
|
||||||
|
t.Fatalf("Bad advertise: %v", advertise)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give time for the entries to clear out; it is conservative at 1/second
|
||||||
|
time.Sleep(7 * time.Second)
|
||||||
|
|
||||||
|
entries, err = core2.barrier.List(coreLeaderPrefix)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(entries) != 1 {
|
||||||
|
t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCore_Standby(t *testing.T) {
|
func TestCore_Standby(t *testing.T) {
|
||||||
// Create the first core and initialize it
|
// Create the first core and initialize it
|
||||||
inm := physical.NewInmemHA()
|
inm := physical.NewInmemHA()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user