From e7f6344d97f79067fd3172e88f76795944b78b4c Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Mon, 26 Oct 2020 21:07:28 +0300 Subject: [PATCH] fix: stop etcd on any path on upgrade The problem was that etcd stop was only happening in `LeaveEtcd`, thus upgrade with preserve was never stopping etcd leaving ephemeral partition still busy. Refactored code which was stopping service, shutting down all the services to provide the interface we need: * stop a service without considering reverse dependencies (force); * stop a service (services) waiting for reverse dependencies; * shutdown all the services waiting for reverse dependencies. Signed-off-by: Andrey Smirnov --- .../v1alpha1/v1alpha1_sequencer_tasks.go | 10 +- internal/app/machined/pkg/system/system.go | 116 ++++++++++-------- 2 files changed, 70 insertions(+), 56 deletions(-) diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index cdaebc3d3..8c7872e9d 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -667,13 +667,7 @@ func StartAllServices(seq runtime.Sequence, data interface{}) (runtime.TaskExecu // StopServicesForUpgrade represents the StopServicesForUpgrade task. func StopServicesForUpgrade(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) { - for _, service := range []string{"kubelet", "cri", "udevd"} { - if err = system.Services(nil).Stop(ctx, service); err != nil { - return err - } - } - - return nil + return system.Services(nil).StopWithRevDepenencies(ctx, "cri", "etcd", "kubelet", "udevd") }, "stopServicesForUpgrade" } @@ -1186,7 +1180,7 @@ func LeaveEtcd(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFun // RemoveAllPods represents the task for stopping all pods. func RemoveAllPods(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) { - if err = system.Services(nil).Stop(context.Background(), "kubelet"); err != nil { + if err = system.Services(nil).Stop(ctx, "kubelet"); err != nil { return err } diff --git a/internal/app/machined/pkg/system/system.go b/internal/app/machined/pkg/system/system.go index 32c0ce82b..e85f26ada 100644 --- a/internal/app/machined/pkg/system/system.go +++ b/internal/app/machined/pkg/system/system.go @@ -7,6 +7,7 @@ package system import ( "context" "fmt" + "log" "sort" "sync" "time" @@ -202,20 +203,71 @@ func (s *singleton) Shutdown() { return } - stateCopy := make(map[string]*ServiceRunner) s.terminating = true - for name, svcrunner := range s.state { - stateCopy[name] = svcrunner + _ = s.stopServices(context.Background(), nil, true) //nolint: errcheck +} + +// Stop will initiate a shutdown of the specified service. +func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) { + if len(serviceIDs) == 0 { + return } + + s.mu.Lock() + if s.terminating { + s.mu.Unlock() + return nil + } + + return s.stopServices(ctx, serviceIDs, false) +} + +// StopWithRevDepenencies will initiate a shutdown of the specified services waiting for reverse dependencies to finish first. +// +// If reverse dependency is not stopped, this method might block waiting on it being stopped forever. +func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...string) (err error) { + if len(serviceIDs) == 0 { + return + } + + s.mu.Lock() + if s.terminating { + s.mu.Unlock() + return nil + } + + return s.stopServices(ctx, serviceIDs, true) +} + +//nolint: gocyclo +func (s *singleton) stopServices(ctx context.Context, services []string, waitForRevDependencies bool) error { + stateCopy := make(map[string]*ServiceRunner) + + if services == nil { + for name, svcrunner := range s.state { + stateCopy[name] = svcrunner + } + } else { + for _, name := range services { + if _, ok := s.state[name]; !ok { + continue + } + + stateCopy[name] = s.state[name] + } + } + s.mu.Unlock() // build reverse dependencies reverseDependencies := make(map[string][]string) - for name, svcrunner := range stateCopy { - for _, dependency := range svcrunner.service.DependsOn(s.runtime) { - reverseDependencies[dependency] = append(reverseDependencies[dependency], name) + if waitForRevDependencies { + for name, svcrunner := range stateCopy { + for _, dependency := range svcrunner.service.DependsOn(s.runtime) { + reverseDependencies[dependency] = append(reverseDependencies[dependency], name) + } } } @@ -223,12 +275,16 @@ func (s *singleton) Shutdown() { var shutdownWg sync.WaitGroup // wait max 30 seconds for reverse deps to shut down - shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) + shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, 30*time.Second) defer shutdownCtxCancel() + stoppedConds := []conditions.Condition{} + for name, svcrunner := range stateCopy { shutdownWg.Add(1) + stoppedConds = append(stoppedConds, WaitForService(StateEventDown, name)) + go func(svcrunner *ServiceRunner, reverseDeps []string) { defer shutdownWg.Done() @@ -238,8 +294,10 @@ func (s *singleton) Shutdown() { conds[i] = WaitForService(StateEventDown, reverseDeps[i]) } - // nolint: errcheck - _ = conditions.WaitForAll(conds...).Wait(shutdownCtx) + allDeps := conditions.WaitForAll(conds...) + if err := allDeps.Wait(shutdownCtx); err != nil { + log.Printf("gave up on %s while stopping %q", allDeps, svcrunner.id) + } svcrunner.Shutdown() }(svcrunner, reverseDependencies[name]) @@ -247,7 +305,7 @@ func (s *singleton) Shutdown() { shutdownWg.Wait() - s.wg.Wait() + return conditions.WaitForAll(stoppedConds...).Wait(ctx) } // List returns snapshot of ServiceRunner instances. @@ -267,44 +325,6 @@ func (s *singleton) List() (result []*ServiceRunner) { return } -// Stop will initiate a shutdown of the specified service. -func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) { - if len(serviceIDs) == 0 { - return - } - - s.mu.Lock() - if s.terminating { - s.mu.Unlock() - return - } - - // Copy current service state - stateCopy := make(map[string]*ServiceRunner) - - for _, id := range serviceIDs { - if _, ok := s.state[id]; !ok { - return fmt.Errorf("service not found: %s", id) - } - - stateCopy[id] = s.state[id] - } - - s.mu.Unlock() - - conds := make([]conditions.Condition, 0, len(stateCopy)) - - // Initiate a shutdown on the specific service - for id, svcrunner := range stateCopy { - svcrunner.Shutdown() - - conds = append(conds, WaitForService(StateEventDown, id)) - } - - // Wait for service to actually shut down - return conditions.WaitForAll(conds...).Wait(ctx) -} - // IsRunning checks service status (started/stopped). // // It doesn't check if service runner was started or not, just pure