fix: make reset work even if the node is not bootstrapped/not joined

Now the sequencer is smart enough to skip `LeaveEtcd` and
`CordonAndDrain` node if the node is not fully joined to the cluster
yet.

Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
This commit is contained in:
Artem Chernyshev 2022-08-01 21:10:36 +03:00
parent a6b010a8b4
commit e3d4a0e4d1
No known key found for this signature in database
GPG Key ID: 9B9D0328B57B443F
2 changed files with 106 additions and 4 deletions

View File

@ -284,15 +284,15 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph
phases = phases.AppendWhen(
in.GetGraceful(),
"drain",
CordonAndDrainNode,
taskErrorHandler(logError, CordonAndDrainNode),
).AppendWhen(
in.GetGraceful(),
"cleanup",
RemoveAllPods,
taskErrorHandler(logError, RemoveAllPods),
).AppendWhen(
!in.GetGraceful(),
"cleanup",
StopAllPods,
taskErrorHandler(logError, StopAllPods),
).Append(
"dbus",
StopDBus,
@ -301,7 +301,10 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph
"leave",
LeaveEtcd,
).AppendList(
stopAllPhaselist(r, withKexec),
phaseListErrorHandler(logError, stopAllPhaselist(r, withKexec)...),
).Append(
"forceCleanup",
ForceCleanup,
).AppendWhen(
len(in.GetSystemDiskTargets()) == 0,
"reset",

View File

@ -33,6 +33,7 @@ import (
"github.com/talos-systems/go-blockdevice/blockdevice/partition/gpt"
"github.com/talos-systems/go-blockdevice/blockdevice/util"
"github.com/talos-systems/go-cmd/pkg/cmd"
"github.com/talos-systems/go-cmd/pkg/cmd/proc"
"github.com/talos-systems/go-kmsg"
"github.com/talos-systems/go-procfs/procfs"
"github.com/talos-systems/go-retry/retry"
@ -50,6 +51,7 @@ import (
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime/v1alpha1/platform"
perrors "github.com/talos-systems/talos/internal/app/machined/pkg/runtime/v1alpha1/platform/errors"
"github.com/talos-systems/talos/internal/app/machined/pkg/system"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/events"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/services"
"github.com/talos-systems/talos/internal/app/maintenance"
"github.com/talos-systems/talos/internal/pkg/cri"
@ -1248,6 +1250,15 @@ func UnmountSystemDiskBindMounts(seq runtime.Sequence, data interface{}) (runtim
// k8s.io namespace.
func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
// skip not exist error as it means that the node hasn't fully joined yet
if _, err = os.Stat("/var/lib/kubelet/pki/kubelet-client-current.pem"); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
var nodename string
if nodename, err = r.NodeName(); err != nil {
@ -1299,8 +1310,47 @@ func UncordonNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecution
}
// LeaveEtcd represents the task for removing a control plane node from etcd.
//
//nolint:gocyclo
func LeaveEtcd(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
_, err = os.Stat(filepath.Join(constants.EtcdDataPath, "/member"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
etcdID := (&services.Etcd{}).ID(r)
services := system.Services(r).List()
shouldLeaveEtcd := false
for _, service := range services {
if service.AsProto().Id != etcdID {
continue
}
//nolint:exhaustive
switch service.GetState() {
case events.StateRunning:
fallthrough
case events.StateStopping:
fallthrough
case events.StateFailed:
shouldLeaveEtcd = true
}
break
}
if !shouldLeaveEtcd {
return nil
}
client, err := etcd.NewClientFromControlPlaneIPs(ctx, r.State().V1Alpha2().Resources())
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
@ -1383,6 +1433,9 @@ func stopAndRemoveAllPods(stopAction cri.StopAction) runtime.TaskExecutionFunc {
//nolint:errcheck
defer client.Close()
ctx, cancel := context.WithTimeout(ctx, time.Minute*3)
defer cancel()
// We remove pods with POD network mode first so that the CNI can perform
// any cleanup tasks. If we don't do this, we run the risk of killing the
// CNI, preventing the CRI from cleaning up the pod's netwokring.
@ -1973,6 +2026,21 @@ func StopDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc
}, "stopDBus"
}
// ForceCleanup kills remaining procs and forces partitions unmount.
func ForceCleanup(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
if err := proc.KillAll(); err != nil {
logger.Printf("error killing all procs: %s", err)
}
if err := mount.UnmountAll(); err != nil {
logger.Printf("error unmounting: %s", err)
}
return nil
}, "forceCleanup"
}
func pauseOnFailure(callback func(runtime.Sequence, interface{}) (runtime.TaskExecutionFunc, string),
timeout time.Duration,
) func(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
@ -1997,3 +2065,34 @@ func pauseOnFailure(callback func(runtime.Sequence, interface{}) (runtime.TaskEx
}, name
}
}
func taskErrorHandler(handler func(error, *log.Logger) error, task runtime.TaskSetupFunc) runtime.TaskSetupFunc {
return func(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
f, name := task(seq, data)
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
err := f(ctx, logger, r)
if err != nil {
return handler(err, logger)
}
return nil
}, name
}
}
func phaseListErrorHandler(handler func(error, *log.Logger) error, phases ...runtime.Phase) PhaseList {
for _, phase := range phases {
for i, task := range phase.Tasks {
phase.Tasks[i] = taskErrorHandler(handler, task)
}
}
return phases
}
func logError(err error, logger *log.Logger) error {
logger.Printf("WARNING: task failed: %s", err)
return nil
}