chore: remove cpu_manager_state on cpuManagerPolicy change

After we closed `kubelet`, remove `/var/lib/kubelet/cpu_manager_state` if there are any changes in `cpuManagerPolicy`.
We do not add any other safeguards, so it's user responsibility to cordon/drain the node in advance.

Also minor fixes in other files.

Closes #7504

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
This commit is contained in:
Dmitriy Matrenichev 2023-07-31 23:38:35 +03:00
parent 018e7f5871
commit abf3831174
No known key found for this signature in database
GPG Key ID: D3363CF894E68892
6 changed files with 139 additions and 64 deletions

View File

@ -9,6 +9,7 @@ import (
"context"
"crypto/x509"
"encoding/base64"
stdjson "encoding/json"
"encoding/pem"
"errors"
"fmt"
@ -19,7 +20,10 @@ import (
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/go-pointer"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
@ -89,10 +93,8 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
}
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
if _, ok := channel.RecvWithContext(ctx, r.EventCh()); !ok && ctx.Err() != nil {
return nil //nolint:nilerr
}
_, err := r.Get(ctx, resource.NewMetadata(files.NamespaceName, files.EtcFileStatusType, "machine-id", resource.VersionUndefined))
@ -141,13 +143,11 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
r.QueueReconcile()
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
if _, ok := channel.RecvWithContext(ctx, r.EventCh()); !ok && ctx.Err() != nil {
return nil //nolint:nilerr
}
cfg, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletSpecType, k8s.KubeletID, resource.VersionUndefined))
cfg, err := safe.ReaderGetByID[*k8s.KubeletSpec](ctx, r, k8s.KubeletID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -156,9 +156,9 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
return fmt.Errorf("error getting config: %w", err)
}
cfgSpec := cfg.(*k8s.KubeletSpec).TypedSpec()
cfgSpec := cfg.TypedSpec()
secret, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubeletType, secrets.KubeletID, resource.VersionUndefined))
secret, err := safe.ReaderGetByID[*secrets.Kubelet](ctx, r, secrets.KubeletID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -167,7 +167,7 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
return fmt.Errorf("error getting secrets: %w", err)
}
secretSpec := secret.(*secrets.Kubelet).TypedSpec()
secretSpec := secret.TypedSpec()
if err = ctrl.writePKI(secretSpec); err != nil {
return fmt.Errorf("error writing kubelet PKI: %w", err)
@ -190,17 +190,21 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
// refresh certs only if we are managing the node name (not overridden by the user)
if cfgSpec.ExpectedNodename != "" {
err = ctrl.refreshKubeletCerts(logger, cfgSpec.ExpectedNodename)
err = ctrl.refreshKubeletCerts(cfgSpec.ExpectedNodename, logger)
if err != nil {
return err
}
}
if err = ctrl.handlePolicyChange(cfgSpec, logger); err != nil {
return err
}
if err = ctrl.refreshSelfServingCert(); err != nil {
return err
}
if err = ctrl.updateKubeconfig(logger, secretSpec.Endpoint); err != nil {
if err = ctrl.updateKubeconfig(secretSpec.Endpoint, logger); err != nil {
return err
}
@ -212,6 +216,83 @@ func (ctrl *KubeletServiceController) Run(ctx context.Context, r controller.Runt
}
}
// handlePolicyChange handles the cpuManagerPolicy change.
func (ctrl *KubeletServiceController) handlePolicyChange(cfgSpec *k8s.KubeletSpecSpec, logger *zap.Logger) error {
const managerFilename = "/var/lib/kubelet/cpu_manager_state"
oldPolicy, err := loadPolicyFromFile(managerFilename)
switch {
case errors.Is(err, os.ErrNotExist):
return nil // no cpu_manager_state file, nothing to do
case err != nil:
return fmt.Errorf("error loading cpu_manager_state file: %w", err)
}
policy, err := getFromMap[string](cfgSpec.Config, "cpuManagerPolicy")
if err != nil {
return err
}
newPolicy := policy.ValueOrZero()
if equalPolicy(oldPolicy, newPolicy) {
return nil
}
logger.Info("cpuManagerPolicy changed", zap.String("old", oldPolicy), zap.String("new", newPolicy))
err = os.Remove(managerFilename)
if err != nil {
return fmt.Errorf("error removing cpu_manager_state file: %w", err)
}
return nil
}
func loadPolicyFromFile(filename string) (string, error) {
raw, err := os.ReadFile(filename)
if err != nil {
return "", err
}
cpuManagerState := struct {
Policy string `json:"policyName"`
}{}
if err = stdjson.Unmarshal(raw, &cpuManagerState); err != nil {
return "", err
}
return cpuManagerState.Policy, nil
}
func equalPolicy(current, newOne string) bool {
if current == "none" {
current = ""
}
if newOne == "none" {
newOne = ""
}
return current == newOne
}
func getFromMap[T any](m map[string]any, key string) (optional.Optional[T], error) {
var zero optional.Optional[T]
res, ok := m[key]
if !ok {
return zero, nil
}
if res, ok := res.(T); ok {
return optional.Some(res), nil
}
return zero, fmt.Errorf("unexpected type for key %q: found %T, expected %T", key, res, *new(T))
}
func (ctrl *KubeletServiceController) writePKI(secretSpec *secrets.KubeletSpec) error {
cfg := struct {
Server string
@ -289,7 +370,7 @@ func (ctrl *KubeletServiceController) writeConfig(cfgSpec *k8s.KubeletSpecSpec)
}
// updateKubeconfig updates the kubeconfig of kubelet with the given endpoint if it exists.
func (ctrl *KubeletServiceController) updateKubeconfig(logger *zap.Logger, newEndpoint *url.URL) error {
func (ctrl *KubeletServiceController) updateKubeconfig(newEndpoint *url.URL, logger *zap.Logger) error {
config, err := clientcmd.LoadFromFile(constants.KubeletKubeconfig)
if errors.Is(err, os.ErrNotExist) {
return nil
@ -328,7 +409,7 @@ func (ctrl *KubeletServiceController) updateKubeconfig(logger *zap.Logger, newEn
// refreshKubeletCerts checks if the existing kubelet certificates match the node hostname.
// If they don't match, it clears the certificate directory and the removes kubelet's kubeconfig so that
// they can be regenerated next time kubelet is started.
func (ctrl *KubeletServiceController) refreshKubeletCerts(logger *zap.Logger, nodename string) error {
func (ctrl *KubeletServiceController) refreshKubeletCerts(nodename string, logger *zap.Logger) error {
cert, err := ctrl.readKubeletClientCertificate()
if err != nil {
return err

View File

@ -12,9 +12,10 @@ import (
"time"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/hashicorp/go-multierror"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/slices"
"github.com/siderolabs/go-pointer"
"go.uber.org/zap"
@ -78,13 +79,11 @@ func (ctrl *KubeletSpecController) Outputs() []controller.Output {
//nolint:gocyclo
func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
if _, ok := channel.RecvWithContext(ctx, r.EventCh()); !ok && ctx.Err() != nil {
return nil //nolint:nilerr
}
cfg, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletConfigType, k8s.KubeletID, resource.VersionUndefined))
cfg, err := safe.ReaderGetByID[*k8s.KubeletConfig](ctx, r, k8s.KubeletID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -93,9 +92,9 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime
return fmt.Errorf("error getting config: %w", err)
}
cfgSpec := cfg.(*k8s.KubeletConfig).TypedSpec()
cfgSpec := cfg.TypedSpec()
nodename, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined))
nodename, err := safe.ReaderGetByID[*k8s.Nodename](ctx, r, k8s.NodenameID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -104,9 +103,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime
return fmt.Errorf("error getting nodename: %w", err)
}
nodenameSpec := nodename.(*k8s.Nodename).TypedSpec()
expectedNodename := nodenameSpec.Nodename
expectedNodename := nodename.TypedSpec().Nodename
args := argsbuilder.Args{
"config": "/etc/kubernetes/kubelet.yaml",
@ -134,20 +131,16 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime
// if the user supplied node-ip via extra args, no need to pick automatically
if !extraArgs.Contains("node-ip") {
var nodeIP resource.Resource
nodeIP, err = r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPType, k8s.KubeletID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
nodeIP, nodeErr := safe.ReaderGetByID[*k8s.NodeIP](ctx, r, k8s.KubeletID)
if nodeErr != nil {
if state.IsNotFoundError(nodeErr) {
continue
}
return fmt.Errorf("error getting node IPs: %w", err)
return fmt.Errorf("error getting node IPs: %w", nodeErr)
}
nodeIPSpec := nodeIP.(*k8s.NodeIP).TypedSpec()
nodeIPsString := slices.Map(nodeIPSpec.Addresses, netip.Addr.String)
nodeIPsString := slices.Map(nodeIP.TypedSpec().Addresses, netip.Addr.String)
args["node-ip"] = strings.Join(nodeIPsString, ",")
}
@ -181,11 +174,12 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime
return fmt.Errorf("error converting to unstructured: %w", err)
}
if err = r.Modify(
if err = safe.WriterModify(
ctx,
r,
k8s.NewKubeletSpec(k8s.NamespaceName, k8s.KubeletID),
func(r resource.Resource) error {
kubeletSpec := r.(*k8s.KubeletSpec).TypedSpec()
func(r *k8s.KubeletSpec) error {
kubeletSpec := r.TypedSpec()
kubeletSpec.Image = cfgSpec.Image
kubeletSpec.ExtraMounts = cfgSpec.ExtraMounts

View File

@ -54,8 +54,8 @@ func Services(runtime runtime.Runtime) *singleton {
once.Do(func() {
instance = &singleton{
runtime: runtime,
state: make(map[string]*ServiceRunner),
running: make(map[string]struct{}),
state: map[string]*ServiceRunner{},
running: map[string]struct{}{},
}
})
@ -102,7 +102,7 @@ func (s *singleton) Unload(ctx context.Context, serviceIDs ...string) error {
return nil
}
servicesToRemove := []string{}
servicesToRemove := make([]string, 0, len(serviceIDs))
for _, id := range serviceIDs {
if _, exists := s.state[id]; exists {
@ -264,7 +264,7 @@ func (s *singleton) StopWithRevDepenencies(ctx context.Context, serviceIDs ...st
//nolint:gocyclo
func (s *singleton) stopServices(ctx context.Context, services []string, waitForRevDependencies bool) error {
servicesToStop := make(map[string]*ServiceRunner)
servicesToStop := map[string]*ServiceRunner{}
if services == nil {
for name, svcrunner := range s.state {
@ -282,7 +282,7 @@ func (s *singleton) stopServices(ctx context.Context, services []string, waitFor
// build reverse dependencies, and expand the list of services to stop
// with services which depend on the one being stopped
reverseDependencies := make(map[string][]string)
reverseDependencies := map[string][]string{}
if waitForRevDependencies {
// expand the list of services to stop with the list of services which depend
@ -342,7 +342,7 @@ func (s *singleton) stopServices(ctx context.Context, services []string, waitFor
shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, 30*time.Second)
defer shutdownCtxCancel()
stoppedConds := []conditions.Condition{}
stoppedConds := make([]conditions.Condition, 0, len(servicesToStop))
for name, svcrunner := range servicesToStop {
shutdownWg.Add(1)

View File

@ -224,7 +224,7 @@ func (o KubeletSpecSpec) DeepCopy() KubeletSpecSpec {
}
}
if o.Config != nil {
cp.Config = make(map[string]interface{}, len(o.Config))
cp.Config = make(map[string]any, len(o.Config))
for k2, v2 := range o.Config {
cp.Config[k2] = v2
}
@ -302,7 +302,7 @@ func (o KubeletConfigSpec) DeepCopy() KubeletConfigSpec {
}
}
if o.ExtraConfig != nil {
cp.ExtraConfig = make(map[string]interface{}, len(o.ExtraConfig))
cp.ExtraConfig = make(map[string]any, len(o.ExtraConfig))
for k2, v2 := range o.ExtraConfig {
cp.ExtraConfig[k2] = v2
}

View File

@ -27,18 +27,18 @@ type KubeletConfig = typed.Resource[KubeletConfigSpec, KubeletConfigExtension]
//
//gotagsrewrite:gen
type KubeletConfigSpec struct {
Image string `yaml:"image" protobuf:"1"`
ClusterDNS []string `yaml:"clusterDNS" protobuf:"2"`
ClusterDomain string `yaml:"clusterDomain" protobuf:"3"`
ExtraArgs map[string]string `yaml:"extraArgs,omitempty" protobuf:"4"`
ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"5"`
ExtraConfig map[string]interface{} `yaml:"extraConfig,omitempty" protobuf:"6"`
CloudProviderExternal bool `yaml:"cloudProviderExternal" protobuf:"7"`
DefaultRuntimeSeccompEnabled bool `yaml:"defaultRuntimeSeccompEnabled" protobuf:"8"`
SkipNodeRegistration bool `yaml:"skipNodeRegistration" protobuf:"9"`
StaticPodListURL string `yaml:"staticPodListURL" protobuf:"10"`
DisableManifestsDirectory bool `yaml:"disableManifestsDirectory" protobuf:"11"`
EnableFSQuotaMonitoring bool `yaml:"enableFSQuotaMonitoring" protobuf:"12"`
Image string `yaml:"image" protobuf:"1"`
ClusterDNS []string `yaml:"clusterDNS" protobuf:"2"`
ClusterDomain string `yaml:"clusterDomain" protobuf:"3"`
ExtraArgs map[string]string `yaml:"extraArgs,omitempty" protobuf:"4"`
ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"5"`
ExtraConfig map[string]any `yaml:"extraConfig,omitempty" protobuf:"6"`
CloudProviderExternal bool `yaml:"cloudProviderExternal" protobuf:"7"`
DefaultRuntimeSeccompEnabled bool `yaml:"defaultRuntimeSeccompEnabled" protobuf:"8"`
SkipNodeRegistration bool `yaml:"skipNodeRegistration" protobuf:"9"`
StaticPodListURL string `yaml:"staticPodListURL" protobuf:"10"`
DisableManifestsDirectory bool `yaml:"disableManifestsDirectory" protobuf:"11"`
EnableFSQuotaMonitoring bool `yaml:"enableFSQuotaMonitoring" protobuf:"12"`
}
// NewKubeletConfig initializes an empty KubeletConfig resource.

View File

@ -24,11 +24,11 @@ type KubeletSpec = typed.Resource[KubeletSpecSpec, KubeletSpecExtension]
//
//gotagsrewrite:gen
type KubeletSpecSpec struct {
Image string `yaml:"image" protobuf:"1"`
Args []string `yaml:"args,omitempty" protobuf:"2"`
ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"3"`
ExpectedNodename string `yaml:"expectedNodename,omitempty" protobuf:"4"`
Config map[string]interface{} `yaml:"config" protobuf:"5"`
Image string `yaml:"image" protobuf:"1"`
Args []string `yaml:"args,omitempty" protobuf:"2"`
ExtraMounts []specs.Mount `yaml:"extraMounts,omitempty" protobuf:"3"`
ExpectedNodename string `yaml:"expectedNodename,omitempty" protobuf:"4"`
Config map[string]any `yaml:"config" protobuf:"5"`
}
// NewKubeletSpec initializes an empty KubeletSpec resource.