fix: stop etcd on any path on upgrade

The problem was that etcd stop was only happening in `LeaveEtcd`, thus
upgrade with preserve was never stopping etcd leaving ephemeral
partition still busy.

Refactored code which was stopping service, shutting down all the
services to provide the interface we need:

* stop a service without considering reverse dependencies (force);
* stop a service (services) waiting for reverse dependencies;
* shutdown all the services waiting for reverse dependencies.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2020-10-26 21:07:28 +03:00 committed by talos-bot
parent a7a27e7edd
commit e7f6344d97
2 changed files with 70 additions and 56 deletions

View File

@ -667,13 +667,7 @@ func StartAllServices(seq runtime.Sequence, data interface{}) (runtime.TaskExecu
// StopServicesForUpgrade represents the StopServicesForUpgrade task.
func StopServicesForUpgrade(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
for _, service := range []string{"kubelet", "cri", "udevd"} {
if err = system.Services(nil).Stop(ctx, service); err != nil {
return err
}
}
return nil
return system.Services(nil).StopWithRevDepenencies(ctx, "cri", "etcd", "kubelet", "udevd")
}, "stopServicesForUpgrade"
}
@ -1186,7 +1180,7 @@ func LeaveEtcd(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFun
// RemoveAllPods represents the task for stopping all pods.
func RemoveAllPods(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
if err = system.Services(nil).Stop(context.Background(), "kubelet"); err != nil {
if err = system.Services(nil).Stop(ctx, "kubelet"); err != nil {
return err
}

View File

@ -7,6 +7,7 @@ package system
import (
"context"
"fmt"
"log"
"sort"
"sync"
"time"
@ -202,20 +203,71 @@ func (s *singleton) Shutdown() {
return
}
stateCopy := make(map[string]*ServiceRunner)
s.terminating = true
for name, svcrunner := range s.state {
stateCopy[name] = svcrunner
_ = s.stopServices(context.Background(), nil, true) //nolint: errcheck
}
// Stop will initiate a shutdown of the specified service.
func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) {
if len(serviceIDs) == 0 {
return
}
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return nil
}
return s.stopServices(ctx, serviceIDs, false)
}
// StopWithRevDepenencies will initiate a shutdown of the specified services waiting for reverse dependencies to finish first.
//
// If reverse dependency is not stopped, this method might block waiting on it being stopped forever.
func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...string) (err error) {
if len(serviceIDs) == 0 {
return
}
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return nil
}
return s.stopServices(ctx, serviceIDs, true)
}
//nolint: gocyclo
func (s *singleton) stopServices(ctx context.Context, services []string, waitForRevDependencies bool) error {
stateCopy := make(map[string]*ServiceRunner)
if services == nil {
for name, svcrunner := range s.state {
stateCopy[name] = svcrunner
}
} else {
for _, name := range services {
if _, ok := s.state[name]; !ok {
continue
}
stateCopy[name] = s.state[name]
}
}
s.mu.Unlock()
// build reverse dependencies
reverseDependencies := make(map[string][]string)
for name, svcrunner := range stateCopy {
for _, dependency := range svcrunner.service.DependsOn(s.runtime) {
reverseDependencies[dependency] = append(reverseDependencies[dependency], name)
if waitForRevDependencies {
for name, svcrunner := range stateCopy {
for _, dependency := range svcrunner.service.DependsOn(s.runtime) {
reverseDependencies[dependency] = append(reverseDependencies[dependency], name)
}
}
}
@ -223,12 +275,16 @@ func (s *singleton) Shutdown() {
var shutdownWg sync.WaitGroup
// wait max 30 seconds for reverse deps to shut down
shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, 30*time.Second)
defer shutdownCtxCancel()
stoppedConds := []conditions.Condition{}
for name, svcrunner := range stateCopy {
shutdownWg.Add(1)
stoppedConds = append(stoppedConds, WaitForService(StateEventDown, name))
go func(svcrunner *ServiceRunner, reverseDeps []string) {
defer shutdownWg.Done()
@ -238,8 +294,10 @@ func (s *singleton) Shutdown() {
conds[i] = WaitForService(StateEventDown, reverseDeps[i])
}
// nolint: errcheck
_ = conditions.WaitForAll(conds...).Wait(shutdownCtx)
allDeps := conditions.WaitForAll(conds...)
if err := allDeps.Wait(shutdownCtx); err != nil {
log.Printf("gave up on %s while stopping %q", allDeps, svcrunner.id)
}
svcrunner.Shutdown()
}(svcrunner, reverseDependencies[name])
@ -247,7 +305,7 @@ func (s *singleton) Shutdown() {
shutdownWg.Wait()
s.wg.Wait()
return conditions.WaitForAll(stoppedConds...).Wait(ctx)
}
// List returns snapshot of ServiceRunner instances.
@ -267,44 +325,6 @@ func (s *singleton) List() (result []*ServiceRunner) {
return
}
// Stop will initiate a shutdown of the specified service.
func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error) {
if len(serviceIDs) == 0 {
return
}
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return
}
// Copy current service state
stateCopy := make(map[string]*ServiceRunner)
for _, id := range serviceIDs {
if _, ok := s.state[id]; !ok {
return fmt.Errorf("service not found: %s", id)
}
stateCopy[id] = s.state[id]
}
s.mu.Unlock()
conds := make([]conditions.Condition, 0, len(stateCopy))
// Initiate a shutdown on the specific service
for id, svcrunner := range stateCopy {
svcrunner.Shutdown()
conds = append(conds, WaitForService(StateEventDown, id))
}
// Wait for service to actually shut down
return conditions.WaitForAll(conds...).Wait(ctx)
}
// IsRunning checks service status (started/stopped).
//
// It doesn't check if service runner was started or not, just pure