From abf3831174e15f2710d690208e10887ff67a8f5c Mon Sep 17 00:00:00 2001 From: Dmitriy Matrenichev Date: Mon, 31 Jul 2023 23:38:35 +0300 Subject: [PATCH] chore: remove `cpu_manager_state` on `cpuManagerPolicy` change After we closed `kubelet`, remove `/var/lib/kubelet/cpu_manager_state` if there are any changes in `cpuManagerPolicy`. We do not add any other safeguards, so it's user responsibility to cordon/drain the node in advance. Also minor fixes in other files. Closes #7504 Signed-off-by: Dmitriy Matrenichev --- .../pkg/controllers/k8s/kubelet_service.go | 113 +++++++++++++++--- .../pkg/controllers/k8s/kubelet_spec.go | 40 +++---- internal/app/machined/pkg/system/system.go | 12 +- .../resources/k8s/deep_copy.generated.go | 4 +- pkg/machinery/resources/k8s/kubelet_config.go | 24 ++-- pkg/machinery/resources/k8s/kubelet_spec.go | 10 +- 6 files changed, 139 insertions(+), 64 deletions(-) diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_service.go b/internal/app/machined/pkg/controllers/k8s/kubelet_service.go index 46e0c5f36..71e9f4dd9 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_service.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_service.go @@ -9,6 +9,7 @@ import ( "context" "crypto/x509" "encoding/base64" + stdjson "encoding/json" "encoding/pem" "errors" "fmt" @@ -19,7 +20,10 @@ import ( "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/safe" "github.com/cosi-project/runtime/pkg/state" + "github.com/siderolabs/gen/channel" + "github.com/siderolabs/gen/optional" "github.com/siderolabs/go-pointer" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" @@ -89,10 +93,8 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt } for { - select { - case <-ctx.Done(): - return nil - case <-r.EventCh(): + if _, ok := channel.RecvWithContext(ctx, r.EventCh()); !ok && ctx.Err() != nil { + return nil //nolint:nilerr } _, err := r.Get(ctx, resource.NewMetadata(files.NamespaceName, files.EtcFileStatusType, "machine-id", resource.VersionUndefined)) @@ -141,13 +143,11 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt r.QueueReconcile() for { - select { - case <-ctx.Done(): - return nil - case <-r.EventCh(): + if _, ok := channel.RecvWithContext(ctx, r.EventCh()); !ok && ctx.Err() != nil { + return nil //nolint:nilerr } - cfg, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined)) + cfg, err := safe.ReaderGetByID[*k8s.KubeletSpec](ctx, r, k8s.KubeletID) if err != nil { if state.IsNotFoundError(err) { continue @@ -156,9 +156,9 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt return fmt.Errorf("error getting config: %w", err) } - cfgSpec := cfg.(*k8s.KubeletSpec).TypedSpec() + cfgSpec := cfg.TypedSpec() - secret, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubeletType, secrets.KubeletID, resource.VersionUndefined)) + secret, err := safe.ReaderGetByID[*secrets.Kubelet](ctx, r, secrets.KubeletID) if err != nil { if state.IsNotFoundError(err) { continue @@ -167,7 +167,7 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt return fmt.Errorf("error getting secrets: %w", err) } - secretSpec := secret.(*secrets.Kubelet).TypedSpec() + secretSpec := secret.TypedSpec() if err = ctrl.writePKI(secretSpec); err != nil { return fmt.Errorf("error writing kubelet PKI: %w", err) @@ -190,17 +190,21 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt // refresh certs only if we are managing the node name (not overridden by the user) if cfgSpec.ExpectedNodename != "" { - err = ctrl.refreshKubeletCerts(logger, cfgSpec.ExpectedNodename) + err = ctrl.refreshKubeletCerts(cfgSpec.ExpectedNodename, logger) if err != nil { return err } } + if err = ctrl.handlePolicyChange(cfgSpec, logger); err != nil { + return err + } + if err = ctrl.refreshSelfServingCert(); err != nil { return err } - if err = ctrl.updateKubeconfig(logger, secretSpec.Endpoint); err != nil { + if err = ctrl.updateKubeconfig(secretSpec.Endpoint, logger); err != nil { return err } @@ -212,6 +216,83 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt } } +// handlePolicyChange handles the cpuManagerPolicy change. +func (ctrl *KubeletServiceController) handlePolicyChange(cfgSpec *k8s.KubeletSpecSpec, logger *zap.Logger) error { + const managerFilename = "/var/lib/kubelet/cpu_manager_state" + + oldPolicy, err := loadPolicyFromFile(managerFilename) + + switch { + case errors.Is(err, os.ErrNotExist): + return nil // no cpu_manager_state file, nothing to do + case err != nil: + return fmt.Errorf("error loading cpu_manager_state file: %w", err) + } + + policy, err := getFromMap[string](cfgSpec.Config, "cpuManagerPolicy") + if err != nil { + return err + } + + newPolicy := policy.ValueOrZero() + if equalPolicy(oldPolicy, newPolicy) { + return nil + } + + logger.Info("cpuManagerPolicy changed", zap.String("old", oldPolicy), zap.String("new", newPolicy)) + + err = os.Remove(managerFilename) + if err != nil { + return fmt.Errorf("error removing cpu_manager_state file: %w", err) + } + + return nil +} + +func loadPolicyFromFile(filename string) (string, error) { + raw, err := os.ReadFile(filename) + if err != nil { + return "", err + } + + cpuManagerState := struct { + Policy string `json:"policyName"` + }{} + + if err = stdjson.Unmarshal(raw, &cpuManagerState); err != nil { + return "", err + } + + return cpuManagerState.Policy, nil +} + +func equalPolicy(current, newOne string) bool { + if current == "none" { + current = "" + } + + if newOne == "none" { + newOne = "" + } + + return current == newOne +} + +func getFromMap[T any](m map[string]any, key string) (optional.Optional[T], error) { + var zero optional.Optional[T] + + res, ok := m[key] + if !ok { + return zero, nil + } + + if res, ok := res.(T); ok { + return optional.Some(res), nil + } + + return zero, fmt.Errorf("unexpected type for key %q: found %T, expected %T", key, res, *new(T)) +} + func (ctrl *KubeletServiceController) writePKI(secretSpec *secrets.KubeletSpec) error { cfg := struct { Server string @@ -289,7 +370,7 @@ func (ctrl *KubeletServiceController) writeConfig(cfgSpec *k8s.KubeletSpecSpec) } // updateKubeconfig updates the kubeconfig of kubelet with the given endpoint if it exists. -func (ctrl *KubeletServiceController) updateKubeconfig(logger *zap.Logger, newEndpoint *url.URL) error { +func (ctrl *KubeletServiceController) updateKubeconfig(newEndpoint *url.URL, logger *zap.Logger) error { config, err := clientcmd.LoadFromFile(constants.KubeletKubeconfig) if errors.Is(err, os.ErrNotExist) { return nil @@ -328,7 +409,7 @@ func (ctrl *KubeletServiceController) updateKubeconfig(logger *zap.Logger, newEn // refreshKubeletCerts checks if the existing kubelet certificates match the node hostname. // If they don't match, it clears the certificate directory and the removes kubelet's kubeconfig so that // they can be regenerated next time kubelet is started. -func (ctrl *KubeletServiceController) refreshKubeletCerts(logger *zap.Logger, nodename string) error { +func (ctrl *KubeletServiceController) refreshKubeletCerts(nodename string, logger *zap.Logger) error { cert, err := ctrl.readKubeletClientCertificate() if err != nil { return err diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go index cf5ab0b40..3684defeb 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go @@ -12,9 +12,10 @@ import ( "time" "github.com/cosi-project/runtime/pkg/controller" - "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/safe" "github.com/cosi-project/runtime/pkg/state" "github.com/hashicorp/go-multierror" + "github.com/siderolabs/gen/channel" "github.com/siderolabs/gen/slices" "github.com/siderolabs/go-pointer" "go.uber.org/zap" @@ -78,13 +79,11 @@ func (ctrl *KubeletSpecController) Outputs() []controller.Output { //nolint:gocyclo func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { for { - select { - case <-ctx.Done(): - return nil - case <-r.EventCh(): + if _, ok := channel.RecvWithContext(ctx, r.EventCh()); !ok && ctx.Err() != nil { + return nil //nolint:nilerr } - cfg, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletConfigType, k8s.KubeletID, resource.VersionUndefined)) + cfg, err := safe.ReaderGetByID[*k8s.KubeletConfig](ctx, r, k8s.KubeletID) if err != nil { if state.IsNotFoundError(err) { continue @@ -93,9 +92,9 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime return fmt.Errorf("error getting config: %w", err) } - cfgSpec := cfg.(*k8s.KubeletConfig).TypedSpec() + cfgSpec := cfg.TypedSpec() - nodename, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined)) + nodename, err := safe.ReaderGetByID[*k8s.Nodename](ctx, r, k8s.NodenameID) if err != nil { if state.IsNotFoundError(err) { continue @@ -104,9 +103,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime return fmt.Errorf("error getting nodename: %w", err) } - nodenameSpec := nodename.(*k8s.Nodename).TypedSpec() - - expectedNodename := nodenameSpec.Nodename + expectedNodename := nodename.TypedSpec().Nodename args := argsbuilder.Args{ "config": "/etc/kubernetes/kubelet.yaml", @@ -134,20 +131,16 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime // if the user supplied node-ip via extra args, no need to pick automatically if !extraArgs.Contains("node-ip") { - var nodeIP resource.Resource - - nodeIP, err = r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPType, k8s.KubeletID, resource.VersionUndefined)) - if err != nil { - if state.IsNotFoundError(err) { + nodeIP, nodeErr := safe.ReaderGetByID[*k8s.NodeIP](ctx, r, k8s.KubeletID) + if nodeErr != nil { + if state.IsNotFoundError(nodeErr) { continue } - return fmt.Errorf("error getting node IPs: %w", err) + return fmt.Errorf("error getting node IPs: %w", nodeErr) } - nodeIPSpec := nodeIP.(*k8s.NodeIP).TypedSpec() - - nodeIPsString := slices.Map(nodeIPSpec.Addresses, netip.Addr.String) + nodeIPsString := slices.Map(nodeIP.TypedSpec().Addresses, netip.Addr.String) args["node-ip"] = strings.Join(nodeIPsString, ",") } @@ -181,11 +174,12 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime return fmt.Errorf("error converting to unstructured: %w", err) } - if err = r.Modify( + if err = safe.WriterModify( ctx, + r, k8s.NewKubeletSpec(k8s.NamespaceName, k8s.KubeletID), - func(r resource.Resource) error { - kubeletSpec := r.(*k8s.KubeletSpec).TypedSpec() + func(r *k8s.KubeletSpec) error { + kubeletSpec := r.TypedSpec() kubeletSpec.Image = cfgSpec.Image kubeletSpec.ExtraMounts = cfgSpec.ExtraMounts diff --git a/internal/app/machined/pkg/system/system.go b/internal/app/machined/pkg/system/system.go index e5a544d96..9e98fe261 100644 --- a/internal/app/machined/pkg/system/system.go +++ b/internal/app/machined/pkg/system/system.go @@ -54,8 +54,8 @@ func Services(runtime runtime.Runtime) *singleton { once.Do(func() { instance = &singleton{ runtime: runtime, - state: make(map[string]*ServiceRunner), - running: make(map[string]struct{}), + state: map[string]*ServiceRunner{}, + running: map[string]struct{}{}, } }) @@ -102,7 +102,7 @@ func (s *singleton) Unload(ctx context.Context, serviceIDs ...string) error { return nil } - servicesToRemove := []string{} + servicesToRemove := make([]string, 0, len(serviceIDs)) for _, id := range serviceIDs { if _, exists := s.state[id]; exists { @@ -264,7 +264,7 @@ func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...st //nolint:gocyclo func (s *singleton) stopServices(ctx context.Context, services []string, waitForRevDependencies bool) error { - servicesToStop := make(map[string]*ServiceRunner) + servicesToStop := map[string]*ServiceRunner{} if services == nil { for name, svcrunner := range s.state { @@ -282,7 +282,7 @@ func (s *singleton) stopServices(ctx context.Context, services []string, waitFor // build reverse dependencies, and expand the list of services to stop // with services which depend on the one being stopped - reverseDependencies := make(map[string][]string) + reverseDependencies := map[string][]string{} if waitForRevDependencies { // expand the list of services to stop with the list of services which depend @@ -342,7 +342,7 @@ func (s *singleton) stopServices(ctx context.Context, services []string, waitFor shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, 30*time.Second) defer shutdownCtxCancel() - stoppedConds := []conditions.Condition{} + stoppedConds := make([]conditions.Condition, 0, len(servicesToStop)) for name, svcrunner := range servicesToStop { shutdownWg.Add(1) diff --git a/pkg/machinery/resources/k8s/deep_copy.generated.go b/pkg/machinery/resources/k8s/deep_copy.generated.go index e84b30041..c44700330 100644 --- a/pkg/machinery/resources/k8s/deep_copy.generated.go +++ b/pkg/machinery/resources/k8s/deep_copy.generated.go @@ -224,7 +224,7 @@ func (o KubeletSpecSpec) DeepCopy() KubeletSpecSpec { } } if o.Config != nil { - cp.Config = make(map[string]interface{}, len(o.Config)) + cp.Config = make(map[string]any, len(o.Config)) for k2, v2 := range o.Config { cp.Config[k2] = v2 } @@ -302,7 +302,7 @@ func (o KubeletConfigSpec) DeepCopy() KubeletConfigSpec { } } if o.ExtraConfig != nil { - cp.ExtraConfig = make(map[string]interface{}, len(o.ExtraConfig)) + cp.ExtraConfig = make(map[string]any, len(o.ExtraConfig)) for k2, v2 := range o.ExtraConfig { cp.ExtraConfig[k2] = v2 } diff --git a/pkg/machinery/resources/k8s/kubelet_config.go b/pkg/machinery/resources/k8s/kubelet_config.go index 3dd324ef2..2327ec2f3 100644 --- a/pkg/machinery/resources/k8s/kubelet_config.go +++ b/pkg/machinery/resources/k8s/kubelet_config.go @@ -27,18 +27,18 @@ type KubeletConfig = typed.Resource[KubeletConfigSpec, KubeletConfigExtension] // //gotagsrewrite:gen type KubeletConfigSpec struct { - Image string `yaml:"image" protobuf:"1"` - ClusterDNS []string `yaml:"clusterDNS" protobuf:"2"` - ClusterDomain string `yaml:"clusterDomain" protobuf:"3"` - ExtraArgs map[string]string `yaml:"extraArgs,omitempty" protobuf:"4"` - ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"5"` - ExtraConfig map[string]interface{} `yaml:"extraConfig,omitempty" protobuf:"6"` - CloudProviderExternal bool `yaml:"cloudProviderExternal" protobuf:"7"` - DefaultRuntimeSeccompEnabled bool `yaml:"defaultRuntimeSeccompEnabled" protobuf:"8"` - SkipNodeRegistration bool `yaml:"skipNodeRegistration" protobuf:"9"` - StaticPodListURL string `yaml:"staticPodListURL" protobuf:"10"` - DisableManifestsDirectory bool `yaml:"disableManifestsDirectory" protobuf:"11"` - EnableFSQuotaMonitoring bool `yaml:"enableFSQuotaMonitoring" protobuf:"12"` + Image string `yaml:"image" protobuf:"1"` + ClusterDNS []string `yaml:"clusterDNS" protobuf:"2"` + ClusterDomain string `yaml:"clusterDomain" protobuf:"3"` + ExtraArgs map[string]string `yaml:"extraArgs,omitempty" protobuf:"4"` + ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"5"` + ExtraConfig map[string]any `yaml:"extraConfig,omitempty" protobuf:"6"` + CloudProviderExternal bool `yaml:"cloudProviderExternal" protobuf:"7"` + DefaultRuntimeSeccompEnabled bool `yaml:"defaultRuntimeSeccompEnabled" protobuf:"8"` + SkipNodeRegistration bool `yaml:"skipNodeRegistration" protobuf:"9"` + StaticPodListURL string `yaml:"staticPodListURL" protobuf:"10"` + DisableManifestsDirectory bool `yaml:"disableManifestsDirectory" protobuf:"11"` + EnableFSQuotaMonitoring bool `yaml:"enableFSQuotaMonitoring" protobuf:"12"` } // NewKubeletConfig initializes an empty KubeletConfig resource. diff --git a/pkg/machinery/resources/k8s/kubelet_spec.go b/pkg/machinery/resources/k8s/kubelet_spec.go index 5b095f852..ba29b7022 100644 --- a/pkg/machinery/resources/k8s/kubelet_spec.go +++ b/pkg/machinery/resources/k8s/kubelet_spec.go @@ -24,11 +24,11 @@ type KubeletSpec = typed.Resource[KubeletSpecSpec, KubeletSpecExtension] // //gotagsrewrite:gen type KubeletSpecSpec struct { - Image string `yaml:"image" protobuf:"1"` - Args []string `yaml:"args,omitempty" protobuf:"2"` - ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"3"` - ExpectedNodename string `yaml:"expectedNodename,omitempty" protobuf:"4"` - Config map[string]interface{} `yaml:"config" protobuf:"5"` + Image string `yaml:"image" protobuf:"1"` + Args []string `yaml:"args,omitempty" protobuf:"2"` + ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"3"` + ExpectedNodename string `yaml:"expectedNodename,omitempty" protobuf:"4"` + Config map[string]any `yaml:"config" protobuf:"5"` } // NewKubeletSpec initializes an empty KubeletSpec resource.