From 779ac74a08ae1384875e1db0e98ff346ba24fd03 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Thu, 25 Feb 2021 16:40:50 +0300 Subject: [PATCH] fix: improve the drain function Critical bug (I believe) was that drain code entered the loop to evict the pod after wait for pod to be deleted returned success effectively evicting pod once again once it got rescheduled to a different node. Add a global timeout to prevent draining code from running forever. Filter more pod types which shouldn't be ever drained. Fixes #3124 Signed-off-by: Andrey Smirnov --- go.mod | 2 +- go.sum | 2 + .../v1alpha1/v1alpha1_sequencer_tasks.go | 2 +- pkg/kubernetes/kubernetes.go | 93 +++++++++++-------- 4 files changed, 60 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index dd779100a..6fb317bd9 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/talos-systems/go-cmd v0.0.0-20210216164758-68eb0067e0f0 github.com/talos-systems/go-loadbalancer v0.1.0 github.com/talos-systems/go-procfs v0.0.0-20210108152626-8cbc42d3dc24 - github.com/talos-systems/go-retry v0.2.0 + github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133 github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e github.com/talos-systems/grpc-proxy v0.2.0 github.com/talos-systems/net v0.2.1-0.20210204205549-52c750994376 diff --git a/go.sum b/go.sum index 446def0ab..38c2039e8 100644 --- a/go.sum +++ b/go.sum @@ -869,6 +869,8 @@ github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lI github.com/talos-systems/go-retry v0.1.1-0.20201113203059-8c63d290a688/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= github.com/talos-systems/go-retry v0.2.0 h1:YpQHmtTZ2k0i/bBYRIasdVmF0XaiISVJUOrmZ6FzgLU= github.com/talos-systems/go-retry v0.2.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= +github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133 h1:mHnKEViee9x2A6YbsUykwqh7L+tLpm5HTlos2QDlqts= +github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e h1:uCp8BfH4Ky2R1XkOKA5pSZpeMMyt0AbH29PIrkoBlaM= github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e/go.mod h1:HxhrzAoTZ7ed5Z5VvtCvnCIrOxyXDS7V2B5hCetAMW8= github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1LNzuMEs9Pw= diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index 3ef647384..7c4ff7550 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -1242,7 +1242,7 @@ func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExe return err } - if err = kubeHelper.CordonAndDrain(nodename); err != nil { + if err = kubeHelper.CordonAndDrain(ctx, nodename); err != nil { return err } diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go index a75f443aa..6116f19a1 100644 --- a/pkg/kubernetes/kubernetes.go +++ b/pkg/kubernetes/kubernetes.go @@ -11,11 +11,12 @@ import ( "fmt" "log" "net/url" - "sync" "time" "github.com/talos-systems/crypto/x509" "github.com/talos-systems/go-retry/retry" + "golang.org/x/sync/errgroup" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,6 +32,11 @@ import ( "github.com/talos-systems/talos/pkg/machinery/constants" ) +const ( + // DrainTimeout is maximum time to wait for the node to be drained. + DrainTimeout = 5 * time.Minute +) + // Client represents a set of helper methods for interacting with the // Kubernetes API. type Client struct { @@ -279,18 +285,18 @@ func (h *Client) WaitUntilReady(name string) error { } // CordonAndDrain cordons and drains a node in one call. -func (h *Client) CordonAndDrain(node string) (err error) { - if err = h.Cordon(node); err != nil { +func (h *Client) CordonAndDrain(ctx context.Context, node string) (err error) { + if err = h.Cordon(ctx, node); err != nil { return err } - return h.Drain(node) + return h.Drain(ctx, node) } // Cordon marks a node as unschedulable. -func (h *Client) Cordon(name string) error { - err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).Retry(func() error { - node, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) +func (h *Client) Cordon(ctx context.Context, name string) error { + err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).RetryWithContext(ctx, func(ctx context.Context) error { + node, err := h.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) if err != nil { return retry.UnexpectedError(err) } @@ -302,7 +308,7 @@ func (h *Client) Cordon(name string) error { node.Annotations[constants.AnnotationCordonedKey] = constants.AnnotationCordonedValue node.Spec.Unschedulable = true - if _, err := h.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil { + if _, err := h.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil { return retry.ExpectedError(err) } @@ -352,58 +358,69 @@ func (h *Client) Uncordon(name string, force bool) error { } // Drain evicts all pods on a given node. -func (h *Client) Drain(node string) error { +func (h *Client) Drain(ctx context.Context, node string) error { + ctx, cancel := context.WithTimeout(ctx, DrainTimeout) + defer cancel() + opts := metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node}).String(), } - pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts) + pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(ctx, opts) if err != nil { return fmt.Errorf("cannot get pods for node %s: %w", node, err) } - var wg sync.WaitGroup - - wg.Add(len(pods.Items)) + var eg errgroup.Group // Evict each pod. for _, pod := range pods.Items { - go func(p corev1.Pod) { - defer wg.Done() + p := pod - for _, ref := range p.ObjectMeta.OwnerReferences { - if ref.Kind == "DaemonSet" { - log.Printf("skipping DaemonSet pod %s\n", p.GetName()) + eg.Go(func() error { + if _, ok := p.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; ok { + log.Printf("skipping mirror pod %s/%s\n", p.GetNamespace(), p.GetName()) - return - } - - if ref.Kind == "Node" { - log.Printf("skipping StaticPod pod %s\n", p.GetName()) - - return - } + return nil } - if err := h.evict(p, int64(60)); err != nil { + controllerRef := metav1.GetControllerOf(&p) + + if controllerRef == nil { + log.Printf("skipping unmanaged pod %s/%s\n", p.GetNamespace(), p.GetName()) + + return nil + } + + if controllerRef.Kind == appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind { + log.Printf("skipping DaemonSet pod %s/%s\n", p.GetNamespace(), p.GetName()) + + return nil + } + + if !p.DeletionTimestamp.IsZero() { + log.Printf("skipping deleted pod %s/%s\n", p.GetNamespace(), p.GetName()) + } + + if err := h.evict(ctx, p, int64(60)); err != nil { log.Printf("WARNING: failed to evict pod: %v", err) } - }(pod) + + return nil + }) } - wg.Wait() - - return nil + return eg.Wait() } -func (h *Client) evict(p corev1.Pod, gracePeriod int64) error { +func (h *Client) evict(ctx context.Context, p corev1.Pod, gracePeriod int64) error { for { pol := &policy.Eviction{ ObjectMeta: metav1.ObjectMeta{Namespace: p.GetNamespace(), Name: p.GetName()}, DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}, } - err := h.CoreV1().Pods(p.GetNamespace()).Evict(context.TODO(), pol) + err := h.CoreV1().Pods(p.GetNamespace()).Evict(ctx, pol) switch { case apierrors.IsTooManyRequests(err): @@ -413,16 +430,18 @@ func (h *Client) evict(p corev1.Pod, gracePeriod int64) error { case err != nil: return fmt.Errorf("failed to evict pod %s/%s: %w", p.GetNamespace(), p.GetName(), err) default: - if err = h.waitForPodDeleted(&p); err != nil { + if err = h.waitForPodDeleted(ctx, &p); err != nil { return fmt.Errorf("failed waiting on pod %s/%s to be deleted: %w", p.GetNamespace(), p.GetName(), err) } + + return nil } } } -func (h *Client) waitForPodDeleted(p *corev1.Pod) error { - return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).Retry(func() error { - pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(context.TODO(), p.GetName(), metav1.GetOptions{}) +func (h *Client) waitForPodDeleted(ctx context.Context, p *corev1.Pod) error { + return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).RetryWithContext(ctx, func(ctx context.Context) error { + pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(ctx, p.GetName(), metav1.GetOptions{}) switch { case apierrors.IsNotFound(err): return nil