From bb4abc0961dba4c2e158cfebdd7b3d8c010a30b3 Mon Sep 17 00:00:00 2001 From: Utku Ozdemir Date: Tue, 19 Jul 2022 23:42:52 +0200 Subject: [PATCH] fix: regenerate kubelet certs when hostname changes Clear the kubelet certificates and kubeconfig when hostname changes so that on next start, kubelet goes through the bootstrap process and new certificates are generated and the node is joined to the cluster with the new name. Fixes siderolabs/talos#5834. Signed-off-by: Utku Ozdemir --- .../pkg/controllers/k8s/kubelet_service.go | 94 ++++++++++- .../pkg/controllers/k8s/kubelet_spec.go | 10 +- internal/integration/api/update-endpoint.go | 64 +------- internal/integration/api/update-hostname.go | 150 ++++++++++++++++++ internal/integration/base/k8s.go | 62 ++++++++ pkg/machinery/resources/k8s/kubelet_spec.go | 9 +- 6 files changed, 320 insertions(+), 69 deletions(-) create mode 100644 internal/integration/api/update-hostname.go diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_service.go b/internal/app/machined/pkg/controllers/k8s/kubelet_service.go index 620eb91b4..6615af7cb 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_service.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_service.go @@ -7,7 +7,9 @@ package k8s import ( "bytes" "context" + "crypto/x509" "encoding/base64" + "encoding/pem" "errors" "fmt" "io/ioutil" @@ -30,6 +32,7 @@ import ( "github.com/talos-systems/talos/internal/app/machined/pkg/system" "github.com/talos-systems/talos/internal/app/machined/pkg/system/services" "github.com/talos-systems/talos/pkg/machinery/constants" + "github.com/talos-systems/talos/pkg/machinery/generic/slices" "github.com/talos-systems/talos/pkg/machinery/resources/files" "github.com/talos-systems/talos/pkg/machinery/resources/k8s" runtimeres "github.com/talos-systems/talos/pkg/machinery/resources/runtime" @@ -187,11 +190,16 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt } } - err = updateKubeconfig(secretSpec.Endpoint) + // refresh certs only if we are managing the node name (not overridden by the user) + if cfgSpec.ExpectedNodename != "" { + err = ctrl.refreshKubeletCerts(cfgSpec.ExpectedNodename) + if err != nil { + return err + } + } - // there is no problem if the file does not exist yet since - // it will be created by kubelet on start with correct endpoint - if err != nil && !errors.Is(err, os.ErrNotExist) { + err = updateKubeconfig(secretSpec.Endpoint) + if err != nil { return err } @@ -281,9 +289,13 @@ func (ctrl *KubeletServiceController) writeConfig(cfgSpec *k8s.KubeletSpecSpec) return ioutil.WriteFile("/etc/kubernetes/kubelet.yaml", buf.Bytes(), 0o600) } -// updateKubeconfig updates the kubeconfig of kubelet with the given endpoint. +// updateKubeconfig updates the kubeconfig of kubelet with the given endpoint if it exists. func updateKubeconfig(newEndpoint *url.URL) error { config, err := clientcmd.LoadFromFile(constants.KubeletKubeconfig) + if errors.Is(err, os.ErrNotExist) { + return nil + } + if err != nil { return err } @@ -297,3 +309,75 @@ func updateKubeconfig(newEndpoint *url.URL) error { return clientcmd.WriteToFile(*config, constants.KubeletKubeconfig) } + +// refreshKubeletCerts checks if the existing kubelet certificates match the node hostname. +// If they don't match, it clears the certificate directory and the removes kubelet's kubeconfig so that +// they can be regenerated next time kubelet is started. +func (ctrl *KubeletServiceController) refreshKubeletCerts(hostname string) error { + cert, err := ctrl.readKubeletCertificate() + if err != nil { + return err + } + + if cert == nil { + return nil + } + + valid := slices.Contains(cert.DNSNames, func(name string) bool { + return name == hostname + }) + + if valid { + // certificate looks good, no need to refresh + return nil + } + + // remove the pki directory + err = os.RemoveAll(constants.KubeletPKIDir) + if err != nil { + return err + } + + // clear the kubelet kubeconfig + err = os.Remove(constants.KubeletKubeconfig) + if errors.Is(err, os.ErrNotExist) { + return nil + } + + return err +} + +func (ctrl *KubeletServiceController) readKubeletCertificate() (*x509.Certificate, error) { + raw, err := os.ReadFile(filepath.Join(constants.KubeletPKIDir, "kubelet.crt")) + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + + if err != nil { + return nil, err + } + + for { + block, rest := pem.Decode(raw) + if block == nil { + return nil, nil + } + + raw = rest + + if block.Type != "CERTIFICATE" { + continue + } + + var cert *x509.Certificate + + cert, err = x509.ParseCertificate(block.Bytes) + if err != nil { + return nil, err + } + + if !cert.IsCA { + return cert, nil + } + } +} diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go index 666b6a559..895444082 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go @@ -106,6 +106,8 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime nodenameSpec := nodename.(*k8s.Nodename).TypedSpec() + expectedNodename := nodenameSpec.Nodename + args := argsbuilder.Args{ "bootstrap-kubeconfig": constants.KubeletBootstrapKubeconfig, "kubeconfig": constants.KubeletKubeconfig, @@ -115,7 +117,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime "cert-dir": constants.KubeletPKIDir, - "hostname-override": nodenameSpec.Nodename, + "hostname-override": expectedNodename, } if cfgSpec.CloudProviderExternal { @@ -124,6 +126,11 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime extraArgs := argsbuilder.Args(cfgSpec.ExtraArgs) + // if the user supplied a hostname override, we do not manage it anymore + if extraArgs.Contains("hostname-override") { + expectedNodename = "" + } + // if the user supplied node-ip via extra args, no need to pick automatically if !extraArgs.Contains("node-ip") { var nodeIP resource.Resource @@ -183,6 +190,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime kubeletSpec.ExtraMounts = cfgSpec.ExtraMounts kubeletSpec.Args = args.Args() kubeletSpec.Config = unstructuredConfig + kubeletSpec.ExpectedNodename = expectedNodename return nil }, diff --git a/internal/integration/api/update-endpoint.go b/internal/integration/api/update-endpoint.go index e993109bf..4ef6b011c 100644 --- a/internal/integration/api/update-endpoint.go +++ b/internal/integration/api/update-endpoint.go @@ -9,14 +9,11 @@ package api import ( "context" - "fmt" "net/url" "testing" "time" - "github.com/talos-systems/go-retry/retry" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/talos-systems/talos/internal/integration/base" machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" @@ -61,6 +58,9 @@ func (suite *UpdateEndpointSuite) TestUpdateControlPlaneEndpoint() { nodeInternalIP := suite.RandomDiscoveredNodeInternalIP(machine.TypeControlPlane) + node, err := suite.GetK8sNodeByInternalIP(suite.ctx, nodeInternalIP) + suite.Require().NoError(err) + oldURL := suite.updateEndpointURL(nodeInternalIP, "https://127.0.0.1:40443") nodeReady := func(status corev1.ConditionStatus) bool { @@ -76,15 +76,11 @@ func (suite *UpdateEndpointSuite) TestUpdateControlPlaneEndpoint() { suite.updateEndpointURL(nodeInternalIP, oldURL) // expect node status to be Ready again - suite.Assert().NoError( - suite.waitForNodeReadinessStatus(nodeInternalIP, nodeReady), - ) + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, node.Name, nodeReady)) }() // expect node status to become NotReady - suite.Assert().NoError( - suite.waitForNodeReadinessStatus(nodeInternalIP, nodeNotReady), - ) + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, node.Name, nodeNotReady)) } func (suite *UpdateEndpointSuite) updateEndpointURL(nodeIP string, newURL string) (oldURL string) { @@ -116,56 +112,6 @@ func (suite *UpdateEndpointSuite) updateEndpointURL(nodeIP string, newURL string return } -func (suite *UpdateEndpointSuite) getNodeByInternalIP(internalIP string) (*corev1.Node, error) { - nodeList, err := suite.Clientset.CoreV1().Nodes().List(suite.ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, item := range nodeList.Items { - for _, address := range item.Status.Addresses { - if address.Type == corev1.NodeInternalIP { - if address.Address == internalIP { - return &item, nil - } - } - } - } - - return nil, fmt.Errorf("node with internal IP %s not found", internalIP) -} - -// waitForNodeReadinessStatus waits for node to have the given status. -func (suite *UpdateEndpointSuite) waitForNodeReadinessStatus(nodeInternalIP string, checkFn func(corev1.ConditionStatus) bool) error { - return retry.Constant(5 * time.Minute).Retry(func() error { - readinessStatus, err := suite.getNodeReadinessStatus(nodeInternalIP) - if err != nil { - return err - } - - if !checkFn(readinessStatus) { - return retry.ExpectedError(fmt.Errorf("node readiness status is %s", readinessStatus)) - } - - return nil - }) -} - -func (suite *UpdateEndpointSuite) getNodeReadinessStatus(nodeInternalIP string) (corev1.ConditionStatus, error) { - node, err := suite.getNodeByInternalIP(nodeInternalIP) - if err != nil { - return "", err - } - - for _, condition := range node.Status.Conditions { - if condition.Type == corev1.NodeReady { - return condition.Status, nil - } - } - - return "", fmt.Errorf("node %s has no readiness condition", nodeInternalIP) -} - func init() { allSuites = append(allSuites, new(UpdateEndpointSuite)) } diff --git a/internal/integration/api/update-hostname.go b/internal/integration/api/update-hostname.go new file mode 100644 index 000000000..6dcc8c90a --- /dev/null +++ b/internal/integration/api/update-hostname.go @@ -0,0 +1,150 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//go:build integration_api +// +build integration_api + +package api + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/talos-systems/talos/internal/integration/base" + machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" + "github.com/talos-systems/talos/pkg/machinery/client" + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1" + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" +) + +// UpdateHostnameSuite verifies UpdateHostname API. +type UpdateHostnameSuite struct { + base.K8sSuite + + ctx context.Context //nolint:containedctx + ctxCancel context.CancelFunc +} + +// SuiteName ... +func (suite *UpdateHostnameSuite) SuiteName() string { + return "api.UpdateHostnameSuite" +} + +// SetupTest ... +func (suite *UpdateHostnameSuite) SetupTest() { + // make sure API calls have timeout + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 5*time.Minute) +} + +// TearDownTest ... +func (suite *UpdateHostnameSuite) TearDownTest() { + if suite.ctxCancel != nil { + suite.ctxCancel() + } +} + +// TestUpdateHostname updates the hostname of a worker node, +// then asserts that the node re-joins the cluster with the new hostname. +// It reverts the change at the end of the test and asserts that the node is reported again as Ready. +func (suite *UpdateHostnameSuite) TestUpdateHostname() { + if testing.Short() { + suite.T().Skip("skipping in short mode") + } + + if !suite.Capabilities().SupportsReboot { + suite.T().Skip("cluster doesn't support reboot") + } + + nodeInternalIP := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker) + + nodeCtx := client.WithNodes(suite.ctx, nodeInternalIP) + + node, err := suite.GetK8sNodeByInternalIP(suite.ctx, nodeInternalIP) + suite.Require().NoError(err) + + if strings.HasSuffix(node.Name, ".ec2.internal") { + suite.T().Skip("aws does not support hostname changes") + } + + oldHostname := node.Name + + newHostname := "test-update-hostname" + + err = suite.updateHostname(nodeCtx, nodeInternalIP, newHostname) + suite.Require().NoError(err) + + nodeReady := func(status corev1.ConditionStatus) bool { + return status == corev1.ConditionTrue + } + + nodeNotReady := func(status corev1.ConditionStatus) bool { + return status != corev1.ConditionTrue + } + + defer func() { + // revert the hostname back to the original one + err = suite.updateHostname(nodeCtx, nodeInternalIP, oldHostname) + suite.Require().NoError(err) + + // expect node status to be Ready again + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, oldHostname, nodeReady)) + + // Delete the node with the test hostname + err = suite.Clientset.CoreV1().Nodes().Delete(suite.ctx, newHostname, metav1.DeleteOptions{}) + suite.Require().NoError(err) + + // Reboot node for CNI bridge to be reconfigured: https://stackoverflow.com/questions/61373366 + suite.AssertRebooted( + suite.ctx, nodeInternalIP, func(nodeCtx context.Context) error { + return base.IgnoreGRPCUnavailable(suite.Client.Reboot(nodeCtx)) + }, 10*time.Minute, + ) + }() + + // expect node with old hostname to become NotReady + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, oldHostname, nodeNotReady)) + + // expect node with new hostname to become Ready + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, newHostname, nodeReady)) + + // Delete the node with the old hostname + err = suite.Clientset.CoreV1().Nodes().Delete(suite.ctx, oldHostname, metav1.DeleteOptions{}) + suite.Require().NoError(err) +} + +func (suite *UpdateHostnameSuite) updateHostname(nodeCtx context.Context, nodeIP string, newHostname string) error { + nodeConfig, err := suite.ReadConfigFromNode(nodeCtx) + if err != nil { + return err + } + + nodeConfigRaw, ok := nodeConfig.Raw().(*v1alpha1.Config) + if !ok { + return fmt.Errorf("unexpected node config type %T", nodeConfig.Raw()) + } + + nodeConfigRaw.MachineConfig.MachineNetwork.NetworkHostname = newHostname + + bytes, err := nodeConfigRaw.Bytes() + if err != nil { + return err + } + + _, err = suite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{ + Data: bytes, + Mode: machineapi.ApplyConfigurationRequest_NO_REBOOT, + }) + + return err +} + +func init() { + allSuites = append(allSuites, new(UpdateHostnameSuite)) +} diff --git a/internal/integration/base/k8s.go b/internal/integration/base/k8s.go index 6ddd74b15..90eba044a 100644 --- a/internal/integration/base/k8s.go +++ b/internal/integration/base/k8s.go @@ -9,8 +9,13 @@ package base import ( "context" + "fmt" "time" + "github.com/talos-systems/go-retry/retry" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -49,3 +54,60 @@ func (k8sSuite *K8sSuite) SetupSuite() { k8sSuite.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(config) k8sSuite.Require().NoError(err) } + +// GetK8sNodeByInternalIP returns the kubernetes node by its internal ip or error if it is not found. +func (k8sSuite *K8sSuite) GetK8sNodeByInternalIP(ctx context.Context, internalIP string) (*corev1.Node, error) { + nodeList, err := k8sSuite.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, item := range nodeList.Items { + for _, address := range item.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + if address.Address == internalIP { + return &item, nil + } + } + } + } + + return nil, fmt.Errorf("node with internal IP %s not found", internalIP) +} + +// WaitForK8sNodeReadinessStatus waits for node to have the given status. +// It retries until the node with the name is found and matches the expected condition. +func (k8sSuite *K8sSuite) WaitForK8sNodeReadinessStatus(ctx context.Context, nodeName string, checkFn func(corev1.ConditionStatus) bool) error { + return retry.Constant(5 * time.Minute).Retry(func() error { + readinessStatus, err := k8sSuite.GetK8sNodeReadinessStatus(ctx, nodeName) + if errors.IsNotFound(err) { + return retry.ExpectedError(err) + } + + if err != nil { + return err + } + + if !checkFn(readinessStatus) { + return retry.ExpectedError(fmt.Errorf("node readiness status is %s", readinessStatus)) + } + + return nil + }) +} + +// GetK8sNodeReadinessStatus returns the node readiness status of the node. +func (k8sSuite *K8sSuite) GetK8sNodeReadinessStatus(ctx context.Context, nodeName string) (corev1.ConditionStatus, error) { + node, err := k8sSuite.Clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return "", err + } + + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + return condition.Status, nil + } + } + + return "", fmt.Errorf("node %s has no readiness condition", nodeName) +} diff --git a/pkg/machinery/resources/k8s/kubelet_spec.go b/pkg/machinery/resources/k8s/kubelet_spec.go index b6051180e..9ed277bdb 100644 --- a/pkg/machinery/resources/k8s/kubelet_spec.go +++ b/pkg/machinery/resources/k8s/kubelet_spec.go @@ -19,10 +19,11 @@ type KubeletSpec = typed.Resource[KubeletSpecSpec, KubeletSpecRD] // KubeletSpecSpec holds the source of kubelet configuration. type KubeletSpecSpec struct { - Image string `yaml:"image"` - Args []string `yaml:"args,omitempty"` - ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty"` - Config map[string]interface{} `yaml:"config"` + Image string `yaml:"image"` + Args []string `yaml:"args,omitempty"` + ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty"` + ExpectedNodename string `yaml:"expectedNodename,omitempty"` + Config map[string]interface{} `yaml:"config"` } // NewKubeletSpec initializes an empty KubeletSpec resource.