diff --git a/go.mod b/go.mod index dd779100a..6fb317bd9 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/talos-systems/go-cmd v0.0.0-20210216164758-68eb0067e0f0 github.com/talos-systems/go-loadbalancer v0.1.0 github.com/talos-systems/go-procfs v0.0.0-20210108152626-8cbc42d3dc24 - github.com/talos-systems/go-retry v0.2.0 + github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133 github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e github.com/talos-systems/grpc-proxy v0.2.0 github.com/talos-systems/net v0.2.1-0.20210204205549-52c750994376 diff --git a/go.sum b/go.sum index 446def0ab..38c2039e8 100644 --- a/go.sum +++ b/go.sum @@ -869,6 +869,8 @@ github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lI github.com/talos-systems/go-retry v0.1.1-0.20201113203059-8c63d290a688/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= github.com/talos-systems/go-retry v0.2.0 h1:YpQHmtTZ2k0i/bBYRIasdVmF0XaiISVJUOrmZ6FzgLU= github.com/talos-systems/go-retry v0.2.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= +github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133 h1:mHnKEViee9x2A6YbsUykwqh7L+tLpm5HTlos2QDlqts= +github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e h1:uCp8BfH4Ky2R1XkOKA5pSZpeMMyt0AbH29PIrkoBlaM= github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e/go.mod h1:HxhrzAoTZ7ed5Z5VvtCvnCIrOxyXDS7V2B5hCetAMW8= github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1LNzuMEs9Pw= diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index 3ef647384..7c4ff7550 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -1242,7 +1242,7 @@ func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExe return err } - if err = kubeHelper.CordonAndDrain(nodename); err != nil { + if err = kubeHelper.CordonAndDrain(ctx, nodename); err != nil { return err } diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go index a75f443aa..6116f19a1 100644 --- a/pkg/kubernetes/kubernetes.go +++ b/pkg/kubernetes/kubernetes.go @@ -11,11 +11,12 @@ import ( "fmt" "log" "net/url" - "sync" "time" "github.com/talos-systems/crypto/x509" "github.com/talos-systems/go-retry/retry" + "golang.org/x/sync/errgroup" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,6 +32,11 @@ import ( "github.com/talos-systems/talos/pkg/machinery/constants" ) +const ( + // DrainTimeout is maximum time to wait for the node to be drained. + DrainTimeout = 5 * time.Minute +) + // Client represents a set of helper methods for interacting with the // Kubernetes API. type Client struct { @@ -279,18 +285,18 @@ func (h *Client) WaitUntilReady(name string) error { } // CordonAndDrain cordons and drains a node in one call. -func (h *Client) CordonAndDrain(node string) (err error) { - if err = h.Cordon(node); err != nil { +func (h *Client) CordonAndDrain(ctx context.Context, node string) (err error) { + if err = h.Cordon(ctx, node); err != nil { return err } - return h.Drain(node) + return h.Drain(ctx, node) } // Cordon marks a node as unschedulable. -func (h *Client) Cordon(name string) error { - err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).Retry(func() error { - node, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) +func (h *Client) Cordon(ctx context.Context, name string) error { + err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).RetryWithContext(ctx, func(ctx context.Context) error { + node, err := h.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) if err != nil { return retry.UnexpectedError(err) } @@ -302,7 +308,7 @@ func (h *Client) Cordon(name string) error { node.Annotations[constants.AnnotationCordonedKey] = constants.AnnotationCordonedValue node.Spec.Unschedulable = true - if _, err := h.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil { + if _, err := h.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil { return retry.ExpectedError(err) } @@ -352,58 +358,69 @@ func (h *Client) Uncordon(name string, force bool) error { } // Drain evicts all pods on a given node. -func (h *Client) Drain(node string) error { +func (h *Client) Drain(ctx context.Context, node string) error { + ctx, cancel := context.WithTimeout(ctx, DrainTimeout) + defer cancel() + opts := metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node}).String(), } - pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts) + pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(ctx, opts) if err != nil { return fmt.Errorf("cannot get pods for node %s: %w", node, err) } - var wg sync.WaitGroup - - wg.Add(len(pods.Items)) + var eg errgroup.Group // Evict each pod. for _, pod := range pods.Items { - go func(p corev1.Pod) { - defer wg.Done() + p := pod - for _, ref := range p.ObjectMeta.OwnerReferences { - if ref.Kind == "DaemonSet" { - log.Printf("skipping DaemonSet pod %s\n", p.GetName()) + eg.Go(func() error { + if _, ok := p.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; ok { + log.Printf("skipping mirror pod %s/%s\n", p.GetNamespace(), p.GetName()) - return - } - - if ref.Kind == "Node" { - log.Printf("skipping StaticPod pod %s\n", p.GetName()) - - return - } + return nil } - if err := h.evict(p, int64(60)); err != nil { + controllerRef := metav1.GetControllerOf(&p) + + if controllerRef == nil { + log.Printf("skipping unmanaged pod %s/%s\n", p.GetNamespace(), p.GetName()) + + return nil + } + + if controllerRef.Kind == appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind { + log.Printf("skipping DaemonSet pod %s/%s\n", p.GetNamespace(), p.GetName()) + + return nil + } + + if !p.DeletionTimestamp.IsZero() { + log.Printf("skipping deleted pod %s/%s\n", p.GetNamespace(), p.GetName()) + } + + if err := h.evict(ctx, p, int64(60)); err != nil { log.Printf("WARNING: failed to evict pod: %v", err) } - }(pod) + + return nil + }) } - wg.Wait() - - return nil + return eg.Wait() } -func (h *Client) evict(p corev1.Pod, gracePeriod int64) error { +func (h *Client) evict(ctx context.Context, p corev1.Pod, gracePeriod int64) error { for { pol := &policy.Eviction{ ObjectMeta: metav1.ObjectMeta{Namespace: p.GetNamespace(), Name: p.GetName()}, DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}, } - err := h.CoreV1().Pods(p.GetNamespace()).Evict(context.TODO(), pol) + err := h.CoreV1().Pods(p.GetNamespace()).Evict(ctx, pol) switch { case apierrors.IsTooManyRequests(err): @@ -413,16 +430,18 @@ func (h *Client) evict(p corev1.Pod, gracePeriod int64) error { case err != nil: return fmt.Errorf("failed to evict pod %s/%s: %w", p.GetNamespace(), p.GetName(), err) default: - if err = h.waitForPodDeleted(&p); err != nil { + if err = h.waitForPodDeleted(ctx, &p); err != nil { return fmt.Errorf("failed waiting on pod %s/%s to be deleted: %w", p.GetNamespace(), p.GetName(), err) } + + return nil } } } -func (h *Client) waitForPodDeleted(p *corev1.Pod) error { - return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).Retry(func() error { - pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(context.TODO(), p.GetName(), metav1.GetOptions{}) +func (h *Client) waitForPodDeleted(ctx context.Context, p *corev1.Pod) error { + return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).RetryWithContext(ctx, func(ctx context.Context) error { + pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(ctx, p.GetName(), metav1.GetOptions{}) switch { case apierrors.IsNotFound(err): return nil