From 13c4bbf9d7c0a6df2c995b8e1682c615509fa704 Mon Sep 17 00:00:00 2001 From: vishalnayak Date: Sat, 30 Jul 2016 13:17:29 -0400 Subject: [PATCH 1/7] Add waitgroup wait to allow physical consul to deregister checks --- command/server.go | 60 ++++++++++++++++++++++++++------------------ physical/consul.go | 9 ++++--- physical/physical.go | 3 ++- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/command/server.go b/command/server.go index 19f97b9a74..13fc449cec 100644 --- a/command/server.go +++ b/command/server.go @@ -13,6 +13,7 @@ import ( "sort" "strconv" "strings" + "sync" "syscall" "time" @@ -43,6 +44,8 @@ type ServerCommand struct { ShutdownCh chan struct{} SighupCh chan struct{} + WaitGroup *sync.WaitGroup + meta.Meta logger *log.Logger @@ -308,31 +311,6 @@ func (c *ServerCommand) Run(args []string) int { } } - // If the backend supports service discovery, run service discovery - if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() { - sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) - if ok { - activeFunc := func() bool { - if isLeader, _, err := core.Leader(); err == nil { - return isLeader - } - return false - } - - sealedFunc := func() bool { - if sealed, err := core.Sealed(); err == nil { - return sealed - } - return true - } - - if err := sd.RunServiceDiscovery(c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { - c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) - return 1 - } - } - } - // Initialize the listeners lns := make([]net.Listener, 0, len(config.Listeners)) for i, lnConfig := range config.Listeners { @@ -392,6 +370,37 @@ func (c *ServerCommand) Run(args []string) int { return 0 } + // Perform service discovery registrations and initialization of + // HTTP server after the verifyOnly check. + + // Instantiate the wait group + c.WaitGroup = &sync.WaitGroup{} + + // If the backend supports service discovery, run service discovery + if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() { + sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) + if ok { + activeFunc := func() bool { + if isLeader, _, err := core.Leader(); err == nil { + return isLeader + } + return false + } + + sealedFunc := func() bool { + if sealed, err := core.Sealed(); err == nil { + return sealed + } + return true + } + + if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) + return 1 + } + } + } + // Initialize the HTTP server server := &http.Server{} server.Handler = vaulthttp.Handler(core) @@ -428,6 +437,7 @@ func (c *ServerCommand) Run(args []string) int { } } + c.WaitGroup.Wait() return 0 } diff --git a/physical/consul.go b/physical/consul.go index dc10e5741a..c379dd64de 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -416,17 +416,19 @@ func (c *ConsulBackend) checkDuration() time.Duration { return lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor) } -func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { +func (c *ConsulBackend) RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { if err := c.setAdvertiseAddr(advertiseAddr); err != nil { return err } - go c.runEventDemuxer(shutdownCh, advertiseAddr, activeFunc, sealedFunc) + waitGroup.Add(1) + + go c.runEventDemuxer(waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) return nil } -func (c *ConsulBackend) runEventDemuxer(shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { +func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { // Fire the reconcileTimer immediately upon starting the event demuxer reconcileTimer := time.NewTimer(0) defer reconcileTimer.Stop() @@ -516,6 +518,7 @@ shutdown: if err := c.client.Agent().ServiceDeregister(registeredServiceID); err != nil { c.logger.Printf("[WARN]: physical/consul: service deregistration failed: %v", err) } + defer waitGroup.Done() } // checkID returns the ID used for a Consul Check. Assume at least a read diff --git a/physical/physical.go b/physical/physical.go index ff74c9827d..9e96beb6d8 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -3,6 +3,7 @@ package physical import ( "fmt" "log" + "sync" ) const DefaultParallelOperations = 128 @@ -71,7 +72,7 @@ type ServiceDiscovery interface { // Run executes any background service discovery tasks until the // shutdown channel is closed. - RunServiceDiscovery(shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error + RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error } type Lock interface { From 461c30969ebf67bbb5bbf8a98772b2af4049a5ce Mon Sep 17 00:00:00 2001 From: vishalnayak Date: Sun, 31 Jul 2016 10:09:16 -0400 Subject: [PATCH 2/7] Sharing shutdown message with physical consul backend --- command/server.go | 6 +++--- physical/consul.go | 9 +++++---- physical/physical.go | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/command/server.go b/command/server.go index 13fc449cec..1e1d39b6ea 100644 --- a/command/server.go +++ b/command/server.go @@ -375,6 +375,8 @@ func (c *ServerCommand) Run(args []string) int { // Instantiate the wait group c.WaitGroup = &sync.WaitGroup{} + // Wait for shutdown + shutdownTriggered := false // If the backend supports service discovery, run service discovery if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() { @@ -394,7 +396,7 @@ func (c *ServerCommand) Run(args []string) int { return true } - if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { + if err := sd.RunServiceDiscovery(&shutdownTriggered, c.WaitGroup, c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) return 1 } @@ -419,8 +421,6 @@ func (c *ServerCommand) Run(args []string) int { // Release the log gate. logGate.Flush() - // Wait for shutdown - shutdownTriggered := false for !shutdownTriggered { select { case <-c.ShutdownCh: diff --git a/physical/consul.go b/physical/consul.go index c379dd64de..754a314c3b 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -416,19 +416,19 @@ func (c *ConsulBackend) checkDuration() time.Duration { return lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor) } -func (c *ConsulBackend) RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { +func (c *ConsulBackend) RunServiceDiscovery(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { if err := c.setAdvertiseAddr(advertiseAddr); err != nil { return err } waitGroup.Add(1) - go c.runEventDemuxer(waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) + go c.runEventDemuxer(shutdownTriggered, waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) return nil } -func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { +func (c *ConsulBackend) runEventDemuxer(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { // Fire the reconcileTimer immediately upon starting the event demuxer reconcileTimer := time.NewTimer(0) defer reconcileTimer.Stop() @@ -453,7 +453,7 @@ func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh Sh var registeredServiceID string var serviceRegLock int64 shutdown: - for { + for !shutdown { select { case <-c.notifyActiveCh: // Run reconcile immediately upon active state change notification @@ -511,6 +511,7 @@ shutdown: shutdown = true break shutdown } + shutdown = *shutdownTriggered } c.serviceLock.RLock() diff --git a/physical/physical.go b/physical/physical.go index 9e96beb6d8..e563e80e33 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -72,7 +72,7 @@ type ServiceDiscovery interface { // Run executes any background service discovery tasks until the // shutdown channel is closed. - RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error + RunServiceDiscovery(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error } type Lock interface { From 5318130ba2da1be411d9e8baa252b90b7c9b5338 Mon Sep 17 00:00:00 2001 From: vishalnayak Date: Mon, 1 Aug 2016 10:24:27 -0400 Subject: [PATCH 3/7] Make the defer statement of waitgroup to execute last --- command/server.go | 1 + physical/consul.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/command/server.go b/command/server.go index 1e1d39b6ea..f7b0ce9a92 100644 --- a/command/server.go +++ b/command/server.go @@ -437,6 +437,7 @@ func (c *ServerCommand) Run(args []string) int { } } + // Wait for dependant goroutines to complete c.WaitGroup.Wait() return 0 } diff --git a/physical/consul.go b/physical/consul.go index 754a314c3b..450c3bcc97 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -421,6 +421,7 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownTriggered *bool, waitGroup * return err } + // 'server' command will wait for the belog goroutine to complete waitGroup.Add(1) go c.runEventDemuxer(shutdownTriggered, waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) @@ -429,6 +430,9 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownTriggered *bool, waitGroup * } func (c *ConsulBackend) runEventDemuxer(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { + // This defer statement should be executed last. So push it first. + defer waitGroup.Done() + // Fire the reconcileTimer immediately upon starting the event demuxer reconcileTimer := time.NewTimer(0) defer reconcileTimer.Stop() @@ -519,7 +523,6 @@ shutdown: if err := c.client.Agent().ServiceDeregister(registeredServiceID); err != nil { c.logger.Printf("[WARN]: physical/consul: service deregistration failed: %v", err) } - defer waitGroup.Done() } // checkID returns the ID used for a Consul Check. Assume at least a read From b0ee8869fc2c22bdae83b21604693ac149b7a662 Mon Sep 17 00:00:00 2001 From: vishalnayak Date: Mon, 1 Aug 2016 10:32:29 -0400 Subject: [PATCH 4/7] Fix physical/consul test case --- physical/consul_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/physical/consul_test.go b/physical/consul_test.go index 0687027e60..92929cc6e8 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "os" "reflect" + "sync" "testing" "time" @@ -195,7 +196,9 @@ func TestConsul_newConsulBackend(t *testing.T) { } var shutdownCh ShutdownChannel - if err := c.RunServiceDiscovery(shutdownCh, test.advertiseAddr, testActiveFunc(0.5), testSealedFunc(0.5)); err != nil { + waitGroup := &sync.WaitGroup{} + shutdown := false + if err := c.RunServiceDiscovery(&shutdown, waitGroup, shutdownCh, test.advertiseAddr, testActiveFunc(0.5), testSealedFunc(0.5)); err != nil { t.Fatalf("bad: %v", err) } From 577cd9de353426814dc069244f4228a068efb08c Mon Sep 17 00:00:00 2001 From: vishalnayak Date: Mon, 1 Aug 2016 11:15:25 -0400 Subject: [PATCH 5/7] Address review feedback --- command/server.go | 2 +- physical/consul.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/command/server.go b/command/server.go index f7b0ce9a92..079ca6db06 100644 --- a/command/server.go +++ b/command/server.go @@ -437,7 +437,7 @@ func (c *ServerCommand) Run(args []string) int { } } - // Wait for dependant goroutines to complete + // Wait for dependent goroutines to complete c.WaitGroup.Wait() return 0 } diff --git a/physical/consul.go b/physical/consul.go index 450c3bcc97..9c70f6f826 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -421,7 +421,7 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownTriggered *bool, waitGroup * return err } - // 'server' command will wait for the belog goroutine to complete + // 'server' command will wait for the below goroutine to complete waitGroup.Add(1) go c.runEventDemuxer(shutdownTriggered, waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) From 32b39e808b1c8a69f34da549b95c8cd97a5d4ce2 Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Mon, 1 Aug 2016 11:58:45 -0400 Subject: [PATCH 6/7] Close the shutdown channel instead of sending a value down --- command/server.go | 13 ++++++------- physical/consul.go | 10 ++++------ physical/physical.go | 2 +- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/command/server.go b/command/server.go index 079ca6db06..df05bc59a7 100644 --- a/command/server.go +++ b/command/server.go @@ -375,8 +375,6 @@ func (c *ServerCommand) Run(args []string) int { // Instantiate the wait group c.WaitGroup = &sync.WaitGroup{} - // Wait for shutdown - shutdownTriggered := false // If the backend supports service discovery, run service discovery if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() { @@ -396,7 +394,7 @@ func (c *ServerCommand) Run(args []string) int { return true } - if err := sd.RunServiceDiscovery(&shutdownTriggered, c.WaitGroup, c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { + if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.AdvertiseAddr, activeFunc, sealedFunc); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) return 1 } @@ -421,6 +419,9 @@ func (c *ServerCommand) Run(args []string) int { // Release the log gate. logGate.Flush() + // Wait for shutdown + shutdownTriggered := false + for !shutdownTriggered { select { case <-c.ShutdownCh: @@ -757,10 +758,8 @@ func MakeShutdownCh() chan struct{} { shutdownCh := make(chan os.Signal, 4) signal.Notify(shutdownCh, os.Interrupt, syscall.SIGTERM) go func() { - for { - <-shutdownCh - resultCh <- struct{}{} - } + <-shutdownCh + close(resultCh) }() return resultCh } diff --git a/physical/consul.go b/physical/consul.go index 9c70f6f826..5f88a31941 100644 --- a/physical/consul.go +++ b/physical/consul.go @@ -416,7 +416,7 @@ func (c *ConsulBackend) checkDuration() time.Duration { return lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor) } -func (c *ConsulBackend) RunServiceDiscovery(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { +func (c *ConsulBackend) RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) (err error) { if err := c.setAdvertiseAddr(advertiseAddr); err != nil { return err } @@ -424,12 +424,12 @@ func (c *ConsulBackend) RunServiceDiscovery(shutdownTriggered *bool, waitGroup * // 'server' command will wait for the below goroutine to complete waitGroup.Add(1) - go c.runEventDemuxer(shutdownTriggered, waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) + go c.runEventDemuxer(waitGroup, shutdownCh, advertiseAddr, activeFunc, sealedFunc) return nil } -func (c *ConsulBackend) runEventDemuxer(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { +func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) { // This defer statement should be executed last. So push it first. defer waitGroup.Done() @@ -456,7 +456,7 @@ func (c *ConsulBackend) runEventDemuxer(shutdownTriggered *bool, waitGroup *sync var checkLock int64 var registeredServiceID string var serviceRegLock int64 -shutdown: + for !shutdown { select { case <-c.notifyActiveCh: @@ -513,9 +513,7 @@ shutdown: case <-shutdownCh: c.logger.Printf("[INFO]: physical/consul: Shutting down consul backend") shutdown = true - break shutdown } - shutdown = *shutdownTriggered } c.serviceLock.RLock() diff --git a/physical/physical.go b/physical/physical.go index e563e80e33..9e96beb6d8 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -72,7 +72,7 @@ type ServiceDiscovery interface { // Run executes any background service discovery tasks until the // shutdown channel is closed. - RunServiceDiscovery(shutdownTriggered *bool, waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error + RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, advertiseAddr string, activeFunc activeFunction, sealedFunc sealedFunction) error } type Lock interface { From c9b80ae853f1730b3d776cc357d96ae7eeb245af Mon Sep 17 00:00:00 2001 From: vishalnayak Date: Mon, 1 Aug 2016 12:20:38 -0400 Subject: [PATCH 7/7] Fixed the test after removing shutdown bool --- physical/consul_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/physical/consul_test.go b/physical/consul_test.go index 92929cc6e8..f382caaa63 100644 --- a/physical/consul_test.go +++ b/physical/consul_test.go @@ -197,8 +197,7 @@ func TestConsul_newConsulBackend(t *testing.T) { var shutdownCh ShutdownChannel waitGroup := &sync.WaitGroup{} - shutdown := false - if err := c.RunServiceDiscovery(&shutdown, waitGroup, shutdownCh, test.advertiseAddr, testActiveFunc(0.5), testSealedFunc(0.5)); err != nil { + if err := c.RunServiceDiscovery(waitGroup, shutdownCh, test.advertiseAddr, testActiveFunc(0.5), testSealedFunc(0.5)); err != nil { t.Fatalf("bad: %v", err) }