diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go index b947e4789..b01b96f70 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go @@ -284,15 +284,15 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph phases = phases.AppendWhen( in.GetGraceful(), "drain", - CordonAndDrainNode, + taskErrorHandler(logError, CordonAndDrainNode), ).AppendWhen( in.GetGraceful(), "cleanup", - RemoveAllPods, + taskErrorHandler(logError, RemoveAllPods), ).AppendWhen( !in.GetGraceful(), "cleanup", - StopAllPods, + taskErrorHandler(logError, StopAllPods), ).Append( "dbus", StopDBus, @@ -301,7 +301,10 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph "leave", LeaveEtcd, ).AppendList( - stopAllPhaselist(r, withKexec), + phaseListErrorHandler(logError, stopAllPhaselist(r, withKexec)...), + ).Append( + "forceCleanup", + ForceCleanup, ).AppendWhen( len(in.GetSystemDiskTargets()) == 0, "reset", diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index 83a314307..422b7c913 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -33,6 +33,7 @@ import ( "github.com/talos-systems/go-blockdevice/blockdevice/partition/gpt" "github.com/talos-systems/go-blockdevice/blockdevice/util" "github.com/talos-systems/go-cmd/pkg/cmd" + "github.com/talos-systems/go-cmd/pkg/cmd/proc" "github.com/talos-systems/go-kmsg" "github.com/talos-systems/go-procfs/procfs" "github.com/talos-systems/go-retry/retry" @@ -50,6 +51,7 @@ import ( "github.com/talos-systems/talos/internal/app/machined/pkg/runtime/v1alpha1/platform" perrors "github.com/talos-systems/talos/internal/app/machined/pkg/runtime/v1alpha1/platform/errors" "github.com/talos-systems/talos/internal/app/machined/pkg/system" + "github.com/talos-systems/talos/internal/app/machined/pkg/system/events" "github.com/talos-systems/talos/internal/app/machined/pkg/system/services" "github.com/talos-systems/talos/internal/app/maintenance" "github.com/talos-systems/talos/internal/pkg/cri" @@ -1248,6 +1250,15 @@ func UnmountSystemDiskBindMounts(seq runtime.Sequence, data interface{}) (runtim // k8s.io namespace. func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) { + // skip not exist error as it means that the node hasn't fully joined yet + if _, err = os.Stat("/var/lib/kubelet/pki/kubelet-client-current.pem"); err != nil { + if os.IsNotExist(err) { + return nil + } + + return err + } + var nodename string if nodename, err = r.NodeName(); err != nil { @@ -1299,8 +1310,47 @@ func UncordonNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecution } // LeaveEtcd represents the task for removing a control plane node from etcd. +// +//nolint:gocyclo func LeaveEtcd(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) { + _, err = os.Stat(filepath.Join(constants.EtcdDataPath, "/member")) + if err != nil { + if os.IsNotExist(err) { + return nil + } + + return err + } + + etcdID := (&services.Etcd{}).ID(r) + + services := system.Services(r).List() + + shouldLeaveEtcd := false + + for _, service := range services { + if service.AsProto().Id != etcdID { + continue + } + + //nolint:exhaustive + switch service.GetState() { + case events.StateRunning: + fallthrough + case events.StateStopping: + fallthrough + case events.StateFailed: + shouldLeaveEtcd = true + } + + break + } + + if !shouldLeaveEtcd { + return nil + } + client, err := etcd.NewClientFromControlPlaneIPs(ctx, r.State().V1Alpha2().Resources()) if err != nil { return fmt.Errorf("failed to create etcd client: %w", err) @@ -1383,6 +1433,9 @@ func stopAndRemoveAllPods(stopAction cri.StopAction) runtime.TaskExecutionFunc { //nolint:errcheck defer client.Close() + ctx, cancel := context.WithTimeout(ctx, time.Minute*3) + defer cancel() + // We remove pods with POD network mode first so that the CNI can perform // any cleanup tasks. If we don't do this, we run the risk of killing the // CNI, preventing the CRI from cleaning up the pod's netwokring. @@ -1973,6 +2026,21 @@ func StopDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc }, "stopDBus" } +// ForceCleanup kills remaining procs and forces partitions unmount. +func ForceCleanup(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { + return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error { + if err := proc.KillAll(); err != nil { + logger.Printf("error killing all procs: %s", err) + } + + if err := mount.UnmountAll(); err != nil { + logger.Printf("error unmounting: %s", err) + } + + return nil + }, "forceCleanup" +} + func pauseOnFailure(callback func(runtime.Sequence, interface{}) (runtime.TaskExecutionFunc, string), timeout time.Duration, ) func(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { @@ -1997,3 +2065,34 @@ func pauseOnFailure(callback func(runtime.Sequence, interface{}) (runtime.TaskEx }, name } } + +func taskErrorHandler(handler func(error, *log.Logger) error, task runtime.TaskSetupFunc) runtime.TaskSetupFunc { + return func(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { + f, name := task(seq, data) + + return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error { + err := f(ctx, logger, r) + if err != nil { + return handler(err, logger) + } + + return nil + }, name + } +} + +func phaseListErrorHandler(handler func(error, *log.Logger) error, phases ...runtime.Phase) PhaseList { + for _, phase := range phases { + for i, task := range phase.Tasks { + phase.Tasks[i] = taskErrorHandler(handler, task) + } + } + + return phases +} + +func logError(err error, logger *log.Logger) error { + logger.Printf("WARNING: task failed: %s", err) + + return nil +}