diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_service.go b/internal/app/machined/pkg/controllers/k8s/kubelet_service.go index 620eb91b4..6615af7cb 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_service.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_service.go @@ -7,7 +7,9 @@ package k8s import ( "bytes" "context" + "crypto/x509" "encoding/base64" + "encoding/pem" "errors" "fmt" "io/ioutil" @@ -30,6 +32,7 @@ import ( "github.com/talos-systems/talos/internal/app/machined/pkg/system" "github.com/talos-systems/talos/internal/app/machined/pkg/system/services" "github.com/talos-systems/talos/pkg/machinery/constants" + "github.com/talos-systems/talos/pkg/machinery/generic/slices" "github.com/talos-systems/talos/pkg/machinery/resources/files" "github.com/talos-systems/talos/pkg/machinery/resources/k8s" runtimeres "github.com/talos-systems/talos/pkg/machinery/resources/runtime" @@ -187,11 +190,16 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt } } - err = updateKubeconfig(secretSpec.Endpoint) + // refresh certs only if we are managing the node name (not overridden by the user) + if cfgSpec.ExpectedNodename != "" { + err = ctrl.refreshKubeletCerts(cfgSpec.ExpectedNodename) + if err != nil { + return err + } + } - // there is no problem if the file does not exist yet since - // it will be created by kubelet on start with correct endpoint - if err != nil && !errors.Is(err, os.ErrNotExist) { + err = updateKubeconfig(secretSpec.Endpoint) + if err != nil { return err } @@ -281,9 +289,13 @@ func (ctrl *KubeletServiceController) writeConfig(cfgSpec *k8s.KubeletSpecSpec) return ioutil.WriteFile("/etc/kubernetes/kubelet.yaml", buf.Bytes(), 0o600) } -// updateKubeconfig updates the kubeconfig of kubelet with the given endpoint. +// updateKubeconfig updates the kubeconfig of kubelet with the given endpoint if it exists. func updateKubeconfig(newEndpoint *url.URL) error { config, err := clientcmd.LoadFromFile(constants.KubeletKubeconfig) + if errors.Is(err, os.ErrNotExist) { + return nil + } + if err != nil { return err } @@ -297,3 +309,75 @@ func updateKubeconfig(newEndpoint *url.URL) error { return clientcmd.WriteToFile(*config, constants.KubeletKubeconfig) } + +// refreshKubeletCerts checks if the existing kubelet certificates match the node hostname. +// If they don't match, it clears the certificate directory and the removes kubelet's kubeconfig so that +// they can be regenerated next time kubelet is started. +func (ctrl *KubeletServiceController) refreshKubeletCerts(hostname string) error { + cert, err := ctrl.readKubeletCertificate() + if err != nil { + return err + } + + if cert == nil { + return nil + } + + valid := slices.Contains(cert.DNSNames, func(name string) bool { + return name == hostname + }) + + if valid { + // certificate looks good, no need to refresh + return nil + } + + // remove the pki directory + err = os.RemoveAll(constants.KubeletPKIDir) + if err != nil { + return err + } + + // clear the kubelet kubeconfig + err = os.Remove(constants.KubeletKubeconfig) + if errors.Is(err, os.ErrNotExist) { + return nil + } + + return err +} + +func (ctrl *KubeletServiceController) readKubeletCertificate() (*x509.Certificate, error) { + raw, err := os.ReadFile(filepath.Join(constants.KubeletPKIDir, "kubelet.crt")) + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + + if err != nil { + return nil, err + } + + for { + block, rest := pem.Decode(raw) + if block == nil { + return nil, nil + } + + raw = rest + + if block.Type != "CERTIFICATE" { + continue + } + + var cert *x509.Certificate + + cert, err = x509.ParseCertificate(block.Bytes) + if err != nil { + return nil, err + } + + if !cert.IsCA { + return cert, nil + } + } +} diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go index 666b6a559..895444082 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go @@ -106,6 +106,8 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime nodenameSpec := nodename.(*k8s.Nodename).TypedSpec() + expectedNodename := nodenameSpec.Nodename + args := argsbuilder.Args{ "bootstrap-kubeconfig": constants.KubeletBootstrapKubeconfig, "kubeconfig": constants.KubeletKubeconfig, @@ -115,7 +117,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime "cert-dir": constants.KubeletPKIDir, - "hostname-override": nodenameSpec.Nodename, + "hostname-override": expectedNodename, } if cfgSpec.CloudProviderExternal { @@ -124,6 +126,11 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime extraArgs := argsbuilder.Args(cfgSpec.ExtraArgs) + // if the user supplied a hostname override, we do not manage it anymore + if extraArgs.Contains("hostname-override") { + expectedNodename = "" + } + // if the user supplied node-ip via extra args, no need to pick automatically if !extraArgs.Contains("node-ip") { var nodeIP resource.Resource @@ -183,6 +190,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime kubeletSpec.ExtraMounts = cfgSpec.ExtraMounts kubeletSpec.Args = args.Args() kubeletSpec.Config = unstructuredConfig + kubeletSpec.ExpectedNodename = expectedNodename return nil }, diff --git a/internal/integration/api/update-endpoint.go b/internal/integration/api/update-endpoint.go index e993109bf..4ef6b011c 100644 --- a/internal/integration/api/update-endpoint.go +++ b/internal/integration/api/update-endpoint.go @@ -9,14 +9,11 @@ package api import ( "context" - "fmt" "net/url" "testing" "time" - "github.com/talos-systems/go-retry/retry" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/talos-systems/talos/internal/integration/base" machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" @@ -61,6 +58,9 @@ func (suite *UpdateEndpointSuite) TestUpdateControlPlaneEndpoint() { nodeInternalIP := suite.RandomDiscoveredNodeInternalIP(machine.TypeControlPlane) + node, err := suite.GetK8sNodeByInternalIP(suite.ctx, nodeInternalIP) + suite.Require().NoError(err) + oldURL := suite.updateEndpointURL(nodeInternalIP, "https://127.0.0.1:40443") nodeReady := func(status corev1.ConditionStatus) bool { @@ -76,15 +76,11 @@ func (suite *UpdateEndpointSuite) TestUpdateControlPlaneEndpoint() { suite.updateEndpointURL(nodeInternalIP, oldURL) // expect node status to be Ready again - suite.Assert().NoError( - suite.waitForNodeReadinessStatus(nodeInternalIP, nodeReady), - ) + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, node.Name, nodeReady)) }() // expect node status to become NotReady - suite.Assert().NoError( - suite.waitForNodeReadinessStatus(nodeInternalIP, nodeNotReady), - ) + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, node.Name, nodeNotReady)) } func (suite *UpdateEndpointSuite) updateEndpointURL(nodeIP string, newURL string) (oldURL string) { @@ -116,56 +112,6 @@ func (suite *UpdateEndpointSuite) updateEndpointURL(nodeIP string, newURL string return } -func (suite *UpdateEndpointSuite) getNodeByInternalIP(internalIP string) (*corev1.Node, error) { - nodeList, err := suite.Clientset.CoreV1().Nodes().List(suite.ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, item := range nodeList.Items { - for _, address := range item.Status.Addresses { - if address.Type == corev1.NodeInternalIP { - if address.Address == internalIP { - return &item, nil - } - } - } - } - - return nil, fmt.Errorf("node with internal IP %s not found", internalIP) -} - -// waitForNodeReadinessStatus waits for node to have the given status. -func (suite *UpdateEndpointSuite) waitForNodeReadinessStatus(nodeInternalIP string, checkFn func(corev1.ConditionStatus) bool) error { - return retry.Constant(5 * time.Minute).Retry(func() error { - readinessStatus, err := suite.getNodeReadinessStatus(nodeInternalIP) - if err != nil { - return err - } - - if !checkFn(readinessStatus) { - return retry.ExpectedError(fmt.Errorf("node readiness status is %s", readinessStatus)) - } - - return nil - }) -} - -func (suite *UpdateEndpointSuite) getNodeReadinessStatus(nodeInternalIP string) (corev1.ConditionStatus, error) { - node, err := suite.getNodeByInternalIP(nodeInternalIP) - if err != nil { - return "", err - } - - for _, condition := range node.Status.Conditions { - if condition.Type == corev1.NodeReady { - return condition.Status, nil - } - } - - return "", fmt.Errorf("node %s has no readiness condition", nodeInternalIP) -} - func init() { allSuites = append(allSuites, new(UpdateEndpointSuite)) } diff --git a/internal/integration/api/update-hostname.go b/internal/integration/api/update-hostname.go new file mode 100644 index 000000000..6dcc8c90a --- /dev/null +++ b/internal/integration/api/update-hostname.go @@ -0,0 +1,150 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//go:build integration_api +// +build integration_api + +package api + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/talos-systems/talos/internal/integration/base" + machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" + "github.com/talos-systems/talos/pkg/machinery/client" + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1" + "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" +) + +// UpdateHostnameSuite verifies UpdateHostname API. +type UpdateHostnameSuite struct { + base.K8sSuite + + ctx context.Context //nolint:containedctx + ctxCancel context.CancelFunc +} + +// SuiteName ... +func (suite *UpdateHostnameSuite) SuiteName() string { + return "api.UpdateHostnameSuite" +} + +// SetupTest ... +func (suite *UpdateHostnameSuite) SetupTest() { + // make sure API calls have timeout + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 5*time.Minute) +} + +// TearDownTest ... +func (suite *UpdateHostnameSuite) TearDownTest() { + if suite.ctxCancel != nil { + suite.ctxCancel() + } +} + +// TestUpdateHostname updates the hostname of a worker node, +// then asserts that the node re-joins the cluster with the new hostname. +// It reverts the change at the end of the test and asserts that the node is reported again as Ready. +func (suite *UpdateHostnameSuite) TestUpdateHostname() { + if testing.Short() { + suite.T().Skip("skipping in short mode") + } + + if !suite.Capabilities().SupportsReboot { + suite.T().Skip("cluster doesn't support reboot") + } + + nodeInternalIP := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker) + + nodeCtx := client.WithNodes(suite.ctx, nodeInternalIP) + + node, err := suite.GetK8sNodeByInternalIP(suite.ctx, nodeInternalIP) + suite.Require().NoError(err) + + if strings.HasSuffix(node.Name, ".ec2.internal") { + suite.T().Skip("aws does not support hostname changes") + } + + oldHostname := node.Name + + newHostname := "test-update-hostname" + + err = suite.updateHostname(nodeCtx, nodeInternalIP, newHostname) + suite.Require().NoError(err) + + nodeReady := func(status corev1.ConditionStatus) bool { + return status == corev1.ConditionTrue + } + + nodeNotReady := func(status corev1.ConditionStatus) bool { + return status != corev1.ConditionTrue + } + + defer func() { + // revert the hostname back to the original one + err = suite.updateHostname(nodeCtx, nodeInternalIP, oldHostname) + suite.Require().NoError(err) + + // expect node status to be Ready again + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, oldHostname, nodeReady)) + + // Delete the node with the test hostname + err = suite.Clientset.CoreV1().Nodes().Delete(suite.ctx, newHostname, metav1.DeleteOptions{}) + suite.Require().NoError(err) + + // Reboot node for CNI bridge to be reconfigured: https://stackoverflow.com/questions/61373366 + suite.AssertRebooted( + suite.ctx, nodeInternalIP, func(nodeCtx context.Context) error { + return base.IgnoreGRPCUnavailable(suite.Client.Reboot(nodeCtx)) + }, 10*time.Minute, + ) + }() + + // expect node with old hostname to become NotReady + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, oldHostname, nodeNotReady)) + + // expect node with new hostname to become Ready + suite.Assert().NoError(suite.WaitForK8sNodeReadinessStatus(suite.ctx, newHostname, nodeReady)) + + // Delete the node with the old hostname + err = suite.Clientset.CoreV1().Nodes().Delete(suite.ctx, oldHostname, metav1.DeleteOptions{}) + suite.Require().NoError(err) +} + +func (suite *UpdateHostnameSuite) updateHostname(nodeCtx context.Context, nodeIP string, newHostname string) error { + nodeConfig, err := suite.ReadConfigFromNode(nodeCtx) + if err != nil { + return err + } + + nodeConfigRaw, ok := nodeConfig.Raw().(*v1alpha1.Config) + if !ok { + return fmt.Errorf("unexpected node config type %T", nodeConfig.Raw()) + } + + nodeConfigRaw.MachineConfig.MachineNetwork.NetworkHostname = newHostname + + bytes, err := nodeConfigRaw.Bytes() + if err != nil { + return err + } + + _, err = suite.Client.ApplyConfiguration(nodeCtx, &machineapi.ApplyConfigurationRequest{ + Data: bytes, + Mode: machineapi.ApplyConfigurationRequest_NO_REBOOT, + }) + + return err +} + +func init() { + allSuites = append(allSuites, new(UpdateHostnameSuite)) +} diff --git a/internal/integration/base/k8s.go b/internal/integration/base/k8s.go index 6ddd74b15..90eba044a 100644 --- a/internal/integration/base/k8s.go +++ b/internal/integration/base/k8s.go @@ -9,8 +9,13 @@ package base import ( "context" + "fmt" "time" + "github.com/talos-systems/go-retry/retry" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -49,3 +54,60 @@ func (k8sSuite *K8sSuite) SetupSuite() { k8sSuite.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(config) k8sSuite.Require().NoError(err) } + +// GetK8sNodeByInternalIP returns the kubernetes node by its internal ip or error if it is not found. +func (k8sSuite *K8sSuite) GetK8sNodeByInternalIP(ctx context.Context, internalIP string) (*corev1.Node, error) { + nodeList, err := k8sSuite.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, item := range nodeList.Items { + for _, address := range item.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + if address.Address == internalIP { + return &item, nil + } + } + } + } + + return nil, fmt.Errorf("node with internal IP %s not found", internalIP) +} + +// WaitForK8sNodeReadinessStatus waits for node to have the given status. +// It retries until the node with the name is found and matches the expected condition. +func (k8sSuite *K8sSuite) WaitForK8sNodeReadinessStatus(ctx context.Context, nodeName string, checkFn func(corev1.ConditionStatus) bool) error { + return retry.Constant(5 * time.Minute).Retry(func() error { + readinessStatus, err := k8sSuite.GetK8sNodeReadinessStatus(ctx, nodeName) + if errors.IsNotFound(err) { + return retry.ExpectedError(err) + } + + if err != nil { + return err + } + + if !checkFn(readinessStatus) { + return retry.ExpectedError(fmt.Errorf("node readiness status is %s", readinessStatus)) + } + + return nil + }) +} + +// GetK8sNodeReadinessStatus returns the node readiness status of the node. +func (k8sSuite *K8sSuite) GetK8sNodeReadinessStatus(ctx context.Context, nodeName string) (corev1.ConditionStatus, error) { + node, err := k8sSuite.Clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return "", err + } + + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + return condition.Status, nil + } + } + + return "", fmt.Errorf("node %s has no readiness condition", nodeName) +} diff --git a/pkg/machinery/resources/k8s/kubelet_spec.go b/pkg/machinery/resources/k8s/kubelet_spec.go index b6051180e..9ed277bdb 100644 --- a/pkg/machinery/resources/k8s/kubelet_spec.go +++ b/pkg/machinery/resources/k8s/kubelet_spec.go @@ -19,10 +19,11 @@ type KubeletSpec = typed.Resource[KubeletSpecSpec, KubeletSpecRD] // KubeletSpecSpec holds the source of kubelet configuration. type KubeletSpecSpec struct { - Image string `yaml:"image"` - Args []string `yaml:"args,omitempty"` - ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty"` - Config map[string]interface{} `yaml:"config"` + Image string `yaml:"image"` + Args []string `yaml:"args,omitempty"` + ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty"` + ExpectedNodename string `yaml:"expectedNodename,omitempty"` + Config map[string]interface{} `yaml:"config"` } // NewKubeletSpec initializes an empty KubeletSpec resource.