LukasAuerbeck c81ce8cfb0
feat: support controlplane resources configuration
Fixes #7379

Add possibility to configure the controlplane static pod resources via
APIServer, ControllerManager and Scheduler configs.

Signed-off-by: LukasAuerbeck <17929465+LukasAuerbeck@users.noreply.github.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
2023-07-07 22:44:56 +04:00

430 lines
14 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package k8s
import (
"context"
"fmt"
"strings"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/slices"
"github.com/siderolabs/go-pointer"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"github.com/siderolabs/talos/pkg/argsbuilder"
"github.com/siderolabs/talos/pkg/images"
talosconfig "github.com/siderolabs/talos/pkg/machinery/config/config"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/nethelpers"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
)
// ControlPlaneController manages Kubernetes control plane resources based on configuration.
type ControlPlaneController struct{}
// Name implements controller.Controller interface.
func (ctrl *ControlPlaneController) Name() string {
return "k8s.ControlPlaneController"
}
// Inputs implements controller.Controller interface.
func (ctrl *ControlPlaneController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineConfigType,
ID: pointer.To(config.V1Alpha1ID),
Kind: controller.InputWeak,
},
{
Namespace: config.NamespaceName,
Type: config.MachineTypeType,
ID: pointer.To(config.MachineTypeID),
Kind: controller.InputWeak,
},
}
}
// Outputs implements controller.Controller interface.
func (ctrl *ControlPlaneController) Outputs() []controller.Output {
return []controller.Output{
{
Type: k8s.AdmissionControlConfigType,
Kind: controller.OutputExclusive,
},
{
Type: k8s.AuditPolicyConfigType,
Kind: controller.OutputExclusive,
},
{
Type: k8s.APIServerConfigType,
Kind: controller.OutputExclusive,
},
{
Type: k8s.ControllerManagerConfigType,
Kind: controller.OutputExclusive,
},
{
Type: k8s.ExtraManifestsConfigType,
Kind: controller.OutputExclusive,
},
{
Type: k8s.BootstrapManifestsConfigType,
Kind: controller.OutputExclusive,
},
{
Type: k8s.SchedulerConfigType,
Kind: controller.OutputExclusive,
},
}
}
// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *ControlPlaneController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}
cfg, err := safe.ReaderGetByID[*config.MachineConfig](ctx, r, config.V1Alpha1ID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
return fmt.Errorf("error destroying resources: %w", err)
}
continue
}
return fmt.Errorf("error getting config: %w", err)
}
machineType, err := safe.ReaderGet[*config.MachineType](ctx, r, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
continue
}
return fmt.Errorf("error getting machine type: %w", err)
}
if !machineType.MachineType().IsControlPlane() {
if err = ctrl.teardownAll(ctx, r); err != nil {
return fmt.Errorf("error destroying resources: %w", err)
}
continue
}
for _, f := range []func(context.Context, controller.Runtime, *zap.Logger, talosconfig.Config) error{
ctrl.manageAPIServerConfig,
ctrl.manageAdmissionControlConfig,
ctrl.manageAuditPolicyConfig,
ctrl.manageControllerManagerConfig,
ctrl.manageSchedulerConfig,
ctrl.manageManifestsConfig,
ctrl.manageExtraManifestsConfig,
} {
if err = f(ctx, r, logger, cfg.Config()); err != nil {
return fmt.Errorf("error updating objects: %w", err)
}
}
r.ResetRestartBackoff()
}
}
func convertVolumes(volumes []talosconfig.VolumeMount) []k8s.ExtraVolume {
return slices.Map(volumes, func(v talosconfig.VolumeMount) k8s.ExtraVolume {
return k8s.ExtraVolume{
Name: v.Name(),
HostPath: v.HostPath(),
MountPath: v.MountPath(),
ReadOnly: v.ReadOnly(),
}
})
}
func convertResources(resources talosconfig.Resources) k8s.Resources {
var convertedLimits map[string]string
cpuLimits := resources.CPULimits()
memoryLimits := resources.MemoryLimits()
if cpuLimits != "" || memoryLimits != "" {
convertedLimits = map[string]string{}
if cpuLimits != "" {
convertedLimits[string(v1.ResourceCPU)] = cpuLimits
}
if memoryLimits != "" {
convertedLimits[string(v1.ResourceMemory)] = memoryLimits
}
}
return k8s.Resources{
Requests: map[string]string{
string(v1.ResourceCPU): resources.CPURequests(),
string(v1.ResourceMemory): resources.MemoryRequests(),
},
Limits: convertedLimits,
}
}
func (ctrl *ControlPlaneController) manageAPIServerConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Config) error {
var cloudProvider string
if cfgProvider.Cluster().ExternalCloudProvider().Enabled() {
cloudProvider = "external" //nolint:goconst
}
advertisedAddress := "$(POD_IP)"
if cfgProvider.Machine().Kubelet().SkipNodeRegistration() {
advertisedAddress = ""
}
return safe.WriterModify(ctx, r, k8s.NewAPIServerConfig(), func(r *k8s.APIServerConfig) error {
*r.TypedSpec() = k8s.APIServerConfigSpec{
Image: cfgProvider.Cluster().APIServer().Image(),
CloudProvider: cloudProvider,
ControlPlaneEndpoint: cfgProvider.Cluster().Endpoint().String(),
EtcdServers: []string{fmt.Sprintf("https://%s", nethelpers.JoinHostPort("localhost", constants.EtcdClientPort))},
LocalPort: cfgProvider.Cluster().LocalAPIServerPort(),
ServiceCIDRs: cfgProvider.Cluster().Network().ServiceCIDRs(),
ExtraArgs: cfgProvider.Cluster().APIServer().ExtraArgs(),
ExtraVolumes: convertVolumes(cfgProvider.Cluster().APIServer().ExtraVolumes()),
EnvironmentVariables: cfgProvider.Cluster().APIServer().Env(),
PodSecurityPolicyEnabled: !cfgProvider.Cluster().APIServer().DisablePodSecurityPolicy(),
AdvertisedAddress: advertisedAddress,
Resources: convertResources(cfgProvider.Cluster().APIServer().Resources()),
}
return nil
})
}
func (ctrl *ControlPlaneController) manageAdmissionControlConfig(ctx context.Context, r controller.Runtime, _ *zap.Logger, cfgProvider talosconfig.Config) error {
spec := k8s.AdmissionControlConfigSpec{}
for _, cfg := range cfgProvider.Cluster().APIServer().AdmissionControl() {
spec.Config = append(spec.Config,
k8s.AdmissionPluginSpec{
Name: cfg.Name(),
Configuration: cfg.Configuration(),
},
)
}
return safe.WriterModify(ctx, r, k8s.NewAdmissionControlConfig(), func(r *k8s.AdmissionControlConfig) error {
*r.TypedSpec() = spec
return nil
})
}
func (ctrl *ControlPlaneController) manageAuditPolicyConfig(ctx context.Context, r controller.Runtime, _ *zap.Logger, cfgProvider talosconfig.Config) error {
spec := k8s.AuditPolicyConfigSpec{}
spec.Config = cfgProvider.Cluster().APIServer().AuditPolicy()
return safe.WriterModify(ctx, r, k8s.NewAuditPolicyConfig(), func(r *k8s.AuditPolicyConfig) error {
*r.TypedSpec() = spec
return nil
})
}
func (ctrl *ControlPlaneController) manageControllerManagerConfig(ctx context.Context, r controller.Runtime, _ *zap.Logger, cfgProvider talosconfig.Config) error {
var cloudProvider string
if cfgProvider.Cluster().ExternalCloudProvider().Enabled() {
cloudProvider = "external"
}
return safe.WriterModify(ctx, r, k8s.NewControllerManagerConfig(), func(r *k8s.ControllerManagerConfig) error {
*r.TypedSpec() = k8s.ControllerManagerConfigSpec{
Enabled: !cfgProvider.Machine().Controlplane().ControllerManager().Disabled(),
Image: cfgProvider.Cluster().ControllerManager().Image(),
CloudProvider: cloudProvider,
PodCIDRs: cfgProvider.Cluster().Network().PodCIDRs(),
ServiceCIDRs: cfgProvider.Cluster().Network().ServiceCIDRs(),
ExtraArgs: cfgProvider.Cluster().ControllerManager().ExtraArgs(),
ExtraVolumes: convertVolumes(cfgProvider.Cluster().ControllerManager().ExtraVolumes()),
EnvironmentVariables: cfgProvider.Cluster().ControllerManager().Env(),
}
return nil
})
}
func (ctrl *ControlPlaneController) manageSchedulerConfig(ctx context.Context, r controller.Runtime, _ *zap.Logger, cfgProvider talosconfig.Config) error {
return safe.WriterModify(ctx, r, k8s.NewSchedulerConfig(), func(r *k8s.SchedulerConfig) error {
*r.TypedSpec() = k8s.SchedulerConfigSpec{
Enabled: !cfgProvider.Machine().Controlplane().Scheduler().Disabled(),
Image: cfgProvider.Cluster().Scheduler().Image(),
ExtraArgs: cfgProvider.Cluster().Scheduler().ExtraArgs(),
ExtraVolumes: convertVolumes(cfgProvider.Cluster().Scheduler().ExtraVolumes()),
EnvironmentVariables: cfgProvider.Cluster().Scheduler().Env(),
}
return nil
})
}
func (ctrl *ControlPlaneController) manageManifestsConfig(ctx context.Context, r controller.Runtime, _ *zap.Logger, cfgProvider talosconfig.Config) error {
dnsServiceIPs, err := cfgProvider.Cluster().Network().DNSServiceIPs()
if err != nil {
return fmt.Errorf("error calculating DNS service IPs: %w", err)
}
dnsServiceIP := ""
dnsServiceIPv6 := ""
for _, ip := range dnsServiceIPs {
if dnsServiceIP == "" && ip.Is4() {
dnsServiceIP = ip.String()
}
if dnsServiceIPv6 == "" && ip.Is6() {
dnsServiceIPv6 = ip.String()
}
}
return safe.WriterModify(ctx, r, k8s.NewBootstrapManifestsConfig(), func(r *k8s.BootstrapManifestsConfig) error {
images := images.List(cfgProvider)
proxyArgs, err := getProxyArgs(cfgProvider)
if err != nil {
return err
}
var server string
if cfgProvider.Machine().Features().APIServerBalancer().Enabled() {
server = fmt.Sprintf("https://localhost:%d", cfgProvider.Machine().Features().APIServerBalancer().Port())
} else {
server = cfgProvider.Cluster().Endpoint().String()
}
*r.TypedSpec() = k8s.BootstrapManifestsConfigSpec{
Server: server,
ClusterDomain: cfgProvider.Cluster().Network().DNSDomain(),
PodCIDRs: cfgProvider.Cluster().Network().PodCIDRs(),
ProxyEnabled: cfgProvider.Cluster().Proxy().Enabled(),
ProxyImage: cfgProvider.Cluster().Proxy().Image(),
ProxyArgs: proxyArgs,
CoreDNSEnabled: cfgProvider.Cluster().CoreDNS().Enabled(),
CoreDNSImage: cfgProvider.Cluster().CoreDNS().Image(),
DNSServiceIP: dnsServiceIP,
DNSServiceIPv6: dnsServiceIPv6,
FlannelEnabled: cfgProvider.Cluster().Network().CNI().Name() == constants.FlannelCNI,
FlannelImage: images.Flannel,
FlannelCNIImage: images.FlannelCNI,
PodSecurityPolicyEnabled: !cfgProvider.Cluster().APIServer().DisablePodSecurityPolicy(),
TalosAPIServiceEnabled: cfgProvider.Machine().Features().KubernetesTalosAPIAccess().Enabled(),
}
return nil
})
}
func (ctrl *ControlPlaneController) manageExtraManifestsConfig(ctx context.Context, r controller.Runtime, _ *zap.Logger, cfgProvider talosconfig.Config) error {
return safe.WriterModify(ctx, r, k8s.NewExtraManifestsConfig(), func(r *k8s.ExtraManifestsConfig) error {
spec := k8s.ExtraManifestsConfigSpec{}
for _, url := range cfgProvider.Cluster().Network().CNI().URLs() {
spec.ExtraManifests = append(spec.ExtraManifests, k8s.ExtraManifest{
Name: url,
URL: url,
Priority: "05", // push CNI to the top
})
}
for _, url := range cfgProvider.Cluster().ExternalCloudProvider().ManifestURLs() {
spec.ExtraManifests = append(spec.ExtraManifests, k8s.ExtraManifest{
Name: url,
URL: url,
Priority: "30", // after default manifests
})
}
for _, url := range cfgProvider.Cluster().ExtraManifestURLs() {
spec.ExtraManifests = append(spec.ExtraManifests, k8s.ExtraManifest{
Name: url,
URL: url,
Priority: "99", // make sure extra manifests come last, when PSP is already created
ExtraHeaders: cfgProvider.Cluster().ExtraManifestHeaderMap(),
})
}
for _, manifest := range cfgProvider.Cluster().InlineManifests() {
spec.ExtraManifests = append(spec.ExtraManifests, k8s.ExtraManifest{
Name: manifest.Name(),
Priority: "99", // make sure extra manifests come last, when PSP is already created
InlineManifest: manifest.Contents(),
})
}
*r.TypedSpec() = spec
return nil
})
}
func (ctrl *ControlPlaneController) teardownAll(ctx context.Context, r controller.Runtime) error {
for _, md := range []*resource.Metadata{
k8s.NewAPIServerConfig().Metadata(),
k8s.NewAdmissionControlConfig().Metadata(),
k8s.NewAuditPolicyConfig().Metadata(),
k8s.NewControllerManagerConfig().Metadata(),
k8s.NewSchedulerConfig().Metadata(),
k8s.NewBootstrapManifestsConfig().Metadata(),
k8s.NewExtraManifestsConfig().Metadata(),
} {
if err := r.Destroy(ctx, md); err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error destroying resources: %w", err)
}
}
return nil
}
func getProxyArgs(cfgProvider talosconfig.Config) ([]string, error) {
clusterCidr := strings.Join(cfgProvider.Cluster().Network().PodCIDRs(), ",")
builder := argsbuilder.Args{
"cluster-cidr": clusterCidr,
"hostname-override": "$(NODE_NAME)",
"kubeconfig": "/etc/kubernetes/kubeconfig",
"proxy-mode": cfgProvider.Cluster().Proxy().Mode(),
"conntrack-max-per-core": "0",
}
policies := argsbuilder.MergePolicies{
"kubeconfig": argsbuilder.MergeDenied,
}
if err := builder.Merge(cfgProvider.Cluster().Proxy().ExtraArgs(), argsbuilder.WithMergePolicies(policies)); err != nil {
return nil, err
}
return builder.Args(), nil
}